diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/CompositeService.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/CompositeService.java index 51cb4a3..ba31d9e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/CompositeService.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/CompositeService.java @@ -21,32 +21,38 @@ import java.util.ArrayList; import java.util.List; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.conf.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Composition of services. + *

+ * Child services will have their {@link Service#init(Configuration)}, + * {@link Service#start()} and {@link Service#stop()} lifecycle operations + * invoked to follow this parent service's lifecycle. */ @Public @Evolving -public class CompositeService extends AbstractService { +public class CompositeService extends AbstractService implements ServiceParent { - private static final Log LOG = LogFactory.getLog(CompositeService.class); + private static final Logger LOG = + LoggerFactory.getLogger(CompositeService.class); /** * Policy on shutdown: attempt to close everything (purest) or * only try to close started services (which assumes - * that the service implementations may not handle the stop() operation + * that the service implementations may not handle the stop() operation * except when started. * Irrespective of this policy, if a child service fails during - * its init() or start() operations, it will have stop() called on it. + * its init() or start() operations, + * it will have stop() called on it. */ protected static final boolean STOP_ONLY_STARTED_SERVICES = false; - private final List serviceList = new ArrayList(); + private final List serviceList = new ArrayList<>(); public CompositeService(String name) { super(name); @@ -59,7 +65,7 @@ public CompositeService(String name) { */ public List getServices() { synchronized (serviceList) { - return new ArrayList(serviceList); + return new ArrayList<>(serviceList); } } @@ -68,10 +74,8 @@ public CompositeService(String name) { * {@link CompositeService} * @param service the {@link Service} to be added */ - protected void addService(Service service) { - if (LOG.isDebugEnabled()) { - LOG.debug("Adding service " + service.getName()); - } + public void addService(Service service) { + LOG.debug("Adding service {}", service.getName()); synchronized (serviceList) { serviceList.add(service); } @@ -92,17 +96,25 @@ protected boolean addIfService(Object object) { } } + /** + * Remove a service + * @param service service to remove + * @return true if the service was on the list of child services + */ protected synchronized boolean removeService(Service service) { synchronized (serviceList) { return serviceList.remove(service); } } + /** + * Initialize all child services. + * @param conf configuration + * @throws Exception + */ protected void serviceInit(Configuration conf) throws Exception { List services = getServices(); - if (LOG.isDebugEnabled()) { - LOG.debug(getName() + ": initing services, size=" + services.size()); - } + LOG.debug("{}: initing services, size={}", getName() , services.size()); for (Service service : services) { service.init(conf); } @@ -111,9 +123,7 @@ protected void serviceInit(Configuration conf) throws Exception { protected void serviceStart() throws Exception { List services = getServices(); - if (LOG.isDebugEnabled()) { - LOG.debug(getName() + ": starting services, size=" + services.size()); - } + LOG.debug("{}: starting services, size={}", getName(), services.size()); for (Service service : services) { // start the service. If this fails that service // will be stopped and an exception raised @@ -125,15 +135,13 @@ protected void serviceStart() throws Exception { protected void serviceStop() throws Exception { //stop all services that were started int numOfServicesToStop = serviceList.size(); - if (LOG.isDebugEnabled()) { - LOG.debug(getName() + ": stopping services, size=" + numOfServicesToStop); - } + LOG.debug("{}: stopping services, size={}", getName(), numOfServicesToStop); stop(numOfServicesToStop, STOP_ONLY_STARTED_SERVICES); super.serviceStop(); } /** - * Stop the services in reverse order + * Stop the services in reverse order. * * @param numOfServicesStarted index from where the stop should work * @param stopOnlyStartedServices flag to say "only start services that are @@ -147,9 +155,7 @@ private void stop(int numOfServicesStarted, boolean stopOnlyStartedServices) { List services = getServices(); for (int i = numOfServicesStarted - 1; i >= 0; i--) { Service service = services.get(i); - if (LOG.isDebugEnabled()) { - LOG.debug("Stopping service #" + i + ": " + service); - } + LOG.debug("Stopping service #{} : {}", i, service); STATE state = service.getServiceState(); //depending on the stop police if (state == STATE.STARTED diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/ServiceOperations.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/ServiceOperations.java index 6c03e25..644346e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/ServiceOperations.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/ServiceOperations.java @@ -25,6 +25,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.slf4j.Logger; /** * This class contains a set of methods to work with services, especially @@ -87,6 +88,27 @@ public static Exception stopQuietly(Log log, Service service) { return null; } + /** + * Stop a service; if it is null do nothing. Exceptions are caught and + * logged at warn level. (but not Throwables). This operation is intended to + * be used in cleanup operations + * + * @param log the log to warn at + * @param service a service; may be null + * @return any exception that was caught; null if none was. + * @see ServiceOperations#stopQuietly(Service) + */ + public static Exception stopQuietly(Logger log, Service service) { + try { + stop(service); + } catch (Exception e) { + log.warn("When stopping the service {}: {}" , service.getName(), e, e); + return e; + } + return null; + } + + /** * Class to manage a list of {@link ServiceStateChangeListener} instances, @@ -99,7 +121,7 @@ public static Exception stopQuietly(Log log, Service service) { * that it will never be null. */ private final List listeners = - new ArrayList(); + new ArrayList<>(); /** * Thread-safe addition of a new listener to the end of a list. diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/ServiceParent.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/ServiceParent.java new file mode 100644 index 0000000..95bd787 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/ServiceParent.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.service; + +import java.util.List; + +/** + * Interface for accessing services that contain one or more child + * services. + */ +public interface ServiceParent extends Service { + + /** + * Add a child service. It must be in a consistent state with the + * service to which it is being added. + * @param service the service to add. + */ + void addService(Service service); + + /** + * Get an unmodifiable list of services + * @return a list of child services at the time of invocation - + * added services will not be picked up. + */ + List getServices(); +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/AbstractWorkflowExecutorService.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/AbstractWorkflowExecutorService.java new file mode 100644 index 0000000..02df3b9 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/AbstractWorkflowExecutorService.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.service.workflow; + +import org.apache.hadoop.service.AbstractService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +/** + * An abstract service that hosts an executor. When the service is stopped, + * {@link ExecutorService#shutdownNow()} is invoked. + *

+ * The executor itself is not created: it must be set in the constructor + * or in {@link #setExecutor(ExecutorService)} + */ +public abstract class AbstractWorkflowExecutorService extends AbstractService { + private static final Logger LOG = + LoggerFactory.getLogger(AbstractWorkflowExecutorService.class); + + /** + * The executor. + */ + private ExecutorService executor; + + /** + * Construct an instance with the given name -but + * no executor + * @param name service name + */ + protected AbstractWorkflowExecutorService(String name) { + this(name, null); + } + + /** + * Construct an instance with the given name and executor + * @param name service name + * @param executor exectuor + */ + protected AbstractWorkflowExecutorService(String name, + ExecutorService executor) { + super(name); + this.executor = executor; + } + + /** + * Get the executor + * @return the executor + */ + public synchronized ExecutorService getExecutor() { + return executor; + } + + /** + * Set the executor. This is protected as it + * is intended to be restricted to subclasses + * @param executor executor + */ + protected synchronized void setExecutor(ExecutorService executor) { + this.executor = executor; + } + + /** + * Execute the runnable with the executor (which + * must have been created already) + * @param runnable runnable to execute + */ + public void execute(Runnable runnable) { + getExecutor().execute(runnable); + } + + /** + * Submit a callable + * @param callable callable + * @param type of the final get + * @return a future to wait on + */ + public Future submit(Callable callable) { + return getExecutor().submit(callable); + } + + /** + * Stop the service: halt any executor. + * @throws Exception exception. + */ + @Override + protected void serviceStop() throws Exception { + super.serviceStop(); + stopExecutor(); + } + + /** + * Stop the executor if it is not null. + * This uses {@link ExecutorService#shutdownNow()} + * and so does not block until they have completed. + */ + protected synchronized void stopExecutor() { + if (executor != null) { + LOG.debug("Stopping executor"); + executor.shutdownNow(); + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/LongLivedProcess.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/LongLivedProcess.java new file mode 100644 index 0000000..f7ab263 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/LongLivedProcess.java @@ -0,0 +1,600 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.service.workflow; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.io.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Execute a long-lived process. + * + *

+ * Hadoop's {@link org.apache.hadoop.util.Shell} class assumes it is executing + * a short lived application; this class allows for the process to run for the + * life of the Java process that forked it. + * It is designed to be embedded inside a YARN service, though this is not + * the sole way that it can be used + *

+ * Key Features: + *

    + *
  1. Output is streamed to the output logger provided
  2. . + *
  3. The most recent lines of output are saved to a linked list
  4. . + *
  5. A synchronous callback, {@link LongLivedProcessLifecycleEvent}, is raised on the start + * and finish of a process.
  6. + *
+ * + */ +public class LongLivedProcess implements Runnable { + /** + * Limit on number of lines to retain in the "recent" line list:{@value} + */ + public static final int RECENT_LINE_LOG_LIMIT = 64; + + /** + * Const defining the time in millis between polling for new text + */ + private static final int STREAM_READER_SLEEP_TIME = 200; + + /** + * limit on the length of a stream before it triggers an automatic newline + */ + private static final int LINE_LENGTH = 256; + private final ProcessBuilder processBuilder; + private Process process; + private Integer exitCode = null; + private final String description; + private final ExecutorService processExecutor; + private final ExecutorService logExecutor; + + private ProcessStreamReader processStreamReader; + //list of recent lines, recorded for extraction into reports + private final List recentLines = new LinkedList<>(); + private int recentLineLimit = RECENT_LINE_LOG_LIMIT; + private LongLivedProcessLifecycleEvent lifecycleCallback; + private final AtomicBoolean finalOutputProcessed = new AtomicBoolean(false); + + /** + * Log supplied in the constructor for the spawned process -accessible + * to inner classes + */ + private Logger processLog; + + /** + * Class log -accessible to inner classes + */ + private static final Logger LOG = LoggerFactory.getLogger(LongLivedProcess.class); + + /** + * Volatile flag to indicate that the process is done + */ + private volatile boolean finished; + + public LongLivedProcess(String name, + Logger processLog, + List commands) { + Preconditions.checkArgument(processLog != null, "null processLog"); + Preconditions.checkArgument(commands != null, "null command list"); + Preconditions.checkArgument(!commands.isEmpty(), "empty command list"); + String command = commands.get(0); + Preconditions.checkArgument(command != null && !command.isEmpty(), + "null or empty command list"); + + this.description = String.format("%s: \"%s\"", name, command); + this.processLog = processLog; + ServiceThreadFactory factory = new ServiceThreadFactory(name, true); + processExecutor = Executors.newSingleThreadExecutor(factory); + logExecutor = Executors.newSingleThreadExecutor(factory); + processBuilder = new ProcessBuilder(commands); + processBuilder.redirectErrorStream(false); + } + + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder(description); + sb.append(", running=").append(isRunning()); + sb.append(", finished=").append(finished); + sb.append(", exitCode=").append(exitCode); + return sb.toString(); + } + + /** + * Set the limit on recent lines to retain + * @param recentLineLimit size of rolling list of recent lines. + */ + public void setRecentLineLimit(int recentLineLimit) { + this.recentLineLimit = recentLineLimit; + } + + /** + * Set an optional application exit callback + * @param lifecycleCallback callback to notify on application exit + */ + public void setLifecycleCallback(LongLivedProcessLifecycleEvent lifecycleCallback) { + this.lifecycleCallback = lifecycleCallback; + } + + /** + * Add an entry to the environment + * @param envVar envVar -must not be null + * @param val value + */ + public void setEnv(String envVar, String val) { + Preconditions.checkArgument(envVar != null, "envVar"); + Preconditions.checkArgument(val != null, "val"); + processBuilder.environment().put(envVar, val); + } + + /** + * Bulk set the environment from a map. This does + * not replace the existing environment, just extend it/overwrite single + * entries. + * @param map map to add + */ + public void putEnvMap(Map map) { + for (Map.Entry entry : map.entrySet()) { + String val = entry.getValue(); + String key = entry.getKey(); + setEnv(key, val); + } + } + + /** + * Get the process environment + * @param variable environment variable + * @return the value or null if there is no match + */ + public String getEnv(String variable) { + return processBuilder.environment().get(variable); + } + + /** + * Set the process log. Ignored once the process starts + * @param processLog new log ... may be null + */ + public void setProcessLog(Logger processLog) { + this.processLog = processLog; + } + + /** + * Get the process reference + * @return the process -null if the process is not started + */ + public Process getProcess() { + return process; + } + + /** + * Get the process builder -this can be manipulated + * up to the start() operation. As there is no synchronization + * around it, it must only be used in the same thread setting up the commmand. + * @return the process builder + */ + public ProcessBuilder getProcessBuilder() { + return processBuilder; + } + + /** + * Get the command list + * @return the comands + */ + public List getCommands() { + return processBuilder.command(); + } + + /** + * Get the first command. Construction-time checks guarantee + * that this always returns a string. + * @return the command to execute + */ + public String getCommand() { + return getCommands().get(0); + } + + /** + * probe to see if the process is running + * @return true iff the process has been started and is not yet finished + */ + public boolean isRunning() { + return process != null && !finished; + } + + /** + * Get the exit code: null until the process has finished + * @return the exit code or null + */ + public Integer getExitCode() { + return exitCode; + } + + /** + * Get the exit code sign corrected: null until the process has finished + * @return the exit code or null + */ + public Integer getExitCodeSignCorrected() { + Integer result; + if (exitCode != null) { + result = signCorrectExitCode(exitCode); + } else { + result = null; + } + return result; + } + + /** + * Stop the process if it is running. + * This will trigger an application completion event with the given exit code + */ + public void stop() { + if (!isRunning()) { + return; + } + process.destroy(); + } + + /** + * Get a text description of the builder suitable for log output + * @return a multiline string + */ + protected String describeBuilder() { + StringBuilder buffer = new StringBuilder(); + for (String arg : processBuilder.command()) { + buffer.append('"').append(arg).append("\" "); + } + return buffer.toString(); + } + + /** + * Dump the environment to a string builder + * @param buffer the buffer to append to + */ + public void dumpEnv(StringBuilder buffer) { + buffer.append("\nEnvironment\n-----------"); + Map env = processBuilder.environment(); + Set keys = env.keySet(); + List sortedKeys = new ArrayList<>(keys); + Collections.sort(sortedKeys); + for (String key : sortedKeys) { + buffer.append(key).append("=").append(env.get(key)).append('\n'); + } + } + + /** + * Execute the process. + * @return the process + * @throws IOException on any problem + * @throws FileNotFoundException if the process could not be found + */ + private Process spawnChildProcess() throws IOException { + if (process != null) { + throw new IOException("Process already started"); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Spawning process:\n {}", describeBuilder()); + } + process = processBuilder.start(); + return process; + } + + /** + * Entry point for waiting for the program to finish. + */ + @Override // Runnable + public void run() { + LOG.debug("Lifecycle callback thread running"); + //notify the callback that the process has started + if (lifecycleCallback != null) { + lifecycleCallback.onProcessStarted(this); + } + try { + //close stdin for the process + IOUtils.closeStream(process.getOutputStream()); + exitCode = process.waitFor(); + } catch (InterruptedException e) { + LOG.debug("Process wait interrupted -exiting thread", e); + } finally { + //here the process has finished + LOG.debug("process {} has finished", description); + //tell the logger it has to finish too + finished = true; + + // shut down the threads + logExecutor.shutdown(); + try { + logExecutor.awaitTermination(60, TimeUnit.SECONDS); + } catch (InterruptedException ignored) { + //ignored + LOG.debug("Interrupted while waiting for logExectuor to stop"); + } + + //now call the callback if it is set + if (lifecycleCallback != null) { + lifecycleCallback.onProcessExited(this, exitCode, + getExitCodeSignCorrected()); + } + } + } + + /** + * Spawn the application + * @throws IOException IO problems + */ + public void start() throws IOException { + + spawnChildProcess(); + processStreamReader = + new ProcessStreamReader(processLog, STREAM_READER_SLEEP_TIME); + LOG.debug("Submitting process stream reader for execution on a thread"); + logExecutor.submit(processStreamReader); + LOG.debug("Submitting self for execution on a thread"); + processExecutor.submit(this); + } + + /** + * Get the lines of recent output + * @return the last few lines of output; an empty list if there are none + * or the process is not actually running + */ + public synchronized List getRecentOutput() { + return new ArrayList<>(recentLines); + } + + /** + * @return whether lines of recent output are empty + */ + public synchronized boolean isRecentOutputEmpty() { + return recentLines.isEmpty(); + } + + /** + * Query to see if the final output has been processed + * @return + */ + public boolean isFinalOutputProcessed() { + return finalOutputProcessed.get(); + } + + /** + * Get the recent output from the process, or [] if not defined + * + * @param finalOutput flag to indicate "wait for the final output of the process" + * @param duration the duration, in ms, + * ro wait for recent output to become non-empty + * @return a possibly empty list + */ + public List getRecentOutput(boolean finalOutput, int duration) { + long start = System.currentTimeMillis(); + while (System.currentTimeMillis() - start <= duration) { + boolean finishedOutput; + if (finalOutput) { + // final flag means block until all data is done + finishedOutput = isFinalOutputProcessed(); + } else { + // there is some output + finishedOutput = !isRecentOutputEmpty(); + } + if (finishedOutput) { + break; + } + try { + Thread.sleep(100); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + break; + } + } + return getRecentOutput(); + } + + /** + * Add the recent line to the list of recent lines; deleting + * an earlier on if the limit is reached. + *

+ * Implementation note: yes, a circular array would be more + * efficient, especially with some power of two as the modulo, + * but is it worth the complexity and risk of errors for + * something that is only called once per line of IO? + * @param line line to record + * @param isErrorStream is the line from the error stream + * @param logger logger to log to - null for no logging + */ + private synchronized void recordRecentLine(String line, + boolean isErrorStream, + Logger logger) { + if (line == null) { + return; + } + String entry = (isErrorStream ? "[ERR] " : "[OUT] ") + line; + recentLines.add(entry); + if (recentLines.size() > recentLineLimit) { + recentLines.remove(0); + } + if (logger != null) { + if (isErrorStream) { + logger.warn(line); + } else { + logger.info(line); + } + } + } + + + /** + * Class to read data from the two process streams, and, when run in a thread + * to keep running until the done flag is set. + * Lines are fetched from stdout and stderr and logged at info and error + * respectively. + */ + + private class ProcessStreamReader implements Runnable { + private final Logger streamLog; + private final int sleepTime; + + private ProcessStreamReader(Logger streamLog, int sleepTime) { + this.streamLog = streamLog; + this.sleepTime = sleepTime; + } + + /** + * Return a character if there is one, -1 if nothing is ready yet + * @param reader reader + * @return the value from the reader, or -1 if it is not ready + * @throws IOException IO problems + */ + private int readCharNonBlocking(BufferedReader reader) throws IOException { + if (reader.ready()) { + return reader.read(); + } else { + return -1; + } + } + + /** + * Read in a line, or, if the limit has been reached, the buffer + * so far + * @param reader source of data + * @param line line to build + * @param limit limit of line length + * @return true if the line can be printed + * @throws IOException IO trouble + */ + @SuppressWarnings("NestedAssignment") + private boolean readAnyLine(BufferedReader reader, + StringBuilder line, + int limit) throws IOException { + int next; + while ((-1 != (next = readCharNonBlocking(reader)))) { + if (next != '\n') { + line.append((char) next); + limit--; + if (line.length() > limit) { + //enough has been read in to print it any + return true; + } + } else { + //line end return flag to say so + return true; + } + } + //here the end of the stream is hit, or the limit + return false; + } + + + @Override //Runnable + @SuppressWarnings("IOResourceOpenedButNotSafelyClosed") + public void run() { + StringBuilder outLine = new StringBuilder(LINE_LENGTH); + StringBuilder errorLine = new StringBuilder(LINE_LENGTH); + try(BufferedReader errReader = + new BufferedReader( + new InputStreamReader(process.getErrorStream())); + BufferedReader outReader = + new BufferedReader( + new InputStreamReader(process.getInputStream()))) { + while (!finished) { + boolean processed = false; + if (readAnyLine(errReader, errorLine, LINE_LENGTH)) { + recordRecentLine(errorLine.toString(), true, streamLog); + errorLine.setLength(0); + processed = true; + } + if (readAnyLine(outReader, outLine, LINE_LENGTH)) { + recordRecentLine(outLine.toString(), false, streamLog); + outLine.setLength(0); + processed |= true; + } + if (!processed && !finished) { + //nothing processed: wait a bit for data. + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + //ignore this, rely on the done flag + LOG.debug("Ignoring ", e); + } + } + } + // finished: cleanup + + //print the current error line then stream through the rest + recordFinalOutput(errReader, errorLine, true, streamLog); + //now do the info line + recordFinalOutput(outReader, outLine, false, streamLog); + + + } catch (Exception ignored) { + LOG.warn("encountered {}", ignored, ignored); + //process connection has been torn down + } finally { + //mark output as done + finalOutputProcessed.set(true); + } + } + } + + /** + * Record the final output of a process stream + * @param reader reader of output + * @param lineBuilder string builder into which line is built + * @param isErrorStream flag to indicate whether or not this is the + * is the line from the error stream + * @param logger logger to log to + * @throws IOException + */ + protected void recordFinalOutput(BufferedReader reader, + StringBuilder lineBuilder, boolean isErrorStream, Logger logger) throws + IOException { + String line = lineBuilder.toString(); + recordRecentLine(line, isErrorStream, logger); + line = reader.readLine(); + while (line != null) { + recordRecentLine(line, isErrorStream, logger); + line = reader.readLine(); + if (Thread.interrupted()) { + break; + } + } + } + + /** + * Sign correct an exit code. + * @param code an integer code value in the range 0-255. + * @return a code in the range -127 to +128. + */ + public static int signCorrectExitCode(int code) { + return (code << 24) >> 24; + } + +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/LongLivedProcessLifecycleEvent.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/LongLivedProcessLifecycleEvent.java new file mode 100644 index 0000000..be7a788 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/LongLivedProcessLifecycleEvent.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.service.workflow; + +/** + * Callback invoked when a long-lived application exits. + */ +public interface LongLivedProcessLifecycleEvent { + + /** + * Callback when a process is started + * @param process the process invoking the callback + */ + void onProcessStarted(LongLivedProcess process); + + /** + * Callback when a process has finished + * @param process the process invoking the callback + * @param exitCode exit code from the process + * @param signCorrectedCode the code- as sign corrected + */ + void onProcessExited(LongLivedProcess process, + int exitCode, + int signCorrectedCode); +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/ServiceTerminatingCallable.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/ServiceTerminatingCallable.java new file mode 100644 index 0000000..e86a920 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/ServiceTerminatingCallable.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.service.workflow; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.service.Service; + +import java.util.concurrent.Callable; + +/** + * A runnable which terminates its owner; it also catches any + * exception raised and can serve it back. + * + */ +public class ServiceTerminatingCallable implements Callable { + + /** + * Owning service: may be null + */ + private final Service owner; + + /** + * Any exception raised + */ + private Exception exception; + /** + * This is the callback + */ + private final Callable callable; + + /** + * Create an instance. If the owner is null, the owning service + * is not terminated. + * @param owner owning service -can be null + * @param callable callback. + */ + public ServiceTerminatingCallable(Service owner, + Callable callable) { + Preconditions.checkArgument(callable != null, "null callable"); + this.owner = owner; + this.callable = callable; + } + + /** + * Get the owning service + * @return the service to receive notification when + * the runnable completes. + */ + public Service getOwner() { + return owner; + } + + /** + * Any exception raised by inner action's run. + * @return an exception or null. + */ + public Exception getException() { + return exception; + } + + /** + * Delegates the call to the callable supplied in the constructor, + * then calls the stop() operation on its owner. + *

+ * Any exception is caught, noted and rethrown + * @return the outcome of the delegated call operation + * @throws Exception if one was raised. + */ + @Override + public V call() throws Exception { + try { + return callable.call(); + } catch (Exception e) { + exception = e; + throw e; + } finally { + if (owner != null) { + owner.stop(); + } + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/ServiceTerminatingRunnable.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/ServiceTerminatingRunnable.java new file mode 100644 index 0000000..1043fba --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/ServiceTerminatingRunnable.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.service.workflow; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.service.Service; + +/** + * A runnable which terminates its after running; it also catches any + * exception raised and can serve it back. + */ +public class ServiceTerminatingRunnable implements Runnable { + + private final Service owner; + private final Runnable action; + private Exception exception; + + /** + * Create an instance + * @param owner owning service: may be null + * @param action action to execute before terminating the service + */ + public ServiceTerminatingRunnable(Service owner, Runnable action) { + Preconditions.checkArgument(owner != null, "null owner"); + Preconditions.checkArgument(action != null, "null action"); + this.owner = owner; + this.action = action; + } + + /** + * Get the owning service + * @return the service to receive notification when + * the runnable completes. + */ + public Service getOwner() { + return owner; + } + + /** + * Any exception raised by inner action's run. + * @return an exception or null. + */ + public Exception getException() { + return exception; + } + + @Override + public void run() { + try { + action.run(); + } catch (Exception e) { + exception = e; + } + owner.stop(); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/ServiceThreadFactory.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/ServiceThreadFactory.java new file mode 100644 index 0000000..c4891f6 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/ServiceThreadFactory.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.service.workflow; + +import com.google.common.base.Preconditions; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * A thread factory that creates threads (possibly daemon threads) + * using the name and naming policy supplied. + * The thread counter starts at 1, increments atomically, + * and is supplied as the second argument in the format string. + *

+ * A static method, {@link #singleThreadExecutor(String, boolean)}, + * exists to simplify the construction of an executor with a single well-named + * threads. + *

+ * Example + *

+ *  ExecutorService exec = ServiceThreadFactory.newSingleThreadExecutor("live", true)
+ * 
+ */ +public class ServiceThreadFactory implements ThreadFactory { + + private static AtomicInteger counter = new AtomicInteger(1); + /** + * Default format for thread names: {@value} + */ + public static final String DEFAULT_NAMING_FORMAT = "%s-%03d"; + private final String name; + private final boolean daemons; + private final String namingFormat; + + /** + * Create an instance + * @param name base thread name + * @param daemons flag to indicate the threads should be marked as daemons + * @param namingFormat format string to generate thread names from + */ + public ServiceThreadFactory(String name, + boolean daemons, + String namingFormat) { + Preconditions.checkArgument(name != null, "null name"); + Preconditions.checkArgument(namingFormat != null, "null naming format"); + this.name = name; + this.daemons = daemons; + this.namingFormat = namingFormat; + } + + /** + * + * Create an instance with the default naming format + * @param name base thread name + * @param daemons flag to indicate the threads should be marked as daemons + */ + public ServiceThreadFactory(String name, + boolean daemons) { + this(name, daemons, DEFAULT_NAMING_FORMAT); + } + + @Override + public Thread newThread(Runnable r) { + Preconditions.checkArgument(r != null, "null runnable"); + String threadName = + String.format(namingFormat, name, counter.getAndIncrement()); + Thread thread = new Thread(r, threadName); + thread.setDaemon(daemons); + return thread; + } + + /** + * Create a single thread executor using this naming policy + * @param name base thread name + * @param daemons flag to indicate the threads should be marked as daemons + * @return an executor + */ + public static ExecutorService singleThreadExecutor(String name, + boolean daemons) { + return Executors.newSingleThreadExecutor( + new ServiceThreadFactory(name, daemons)); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/WorkflowCallbackService.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/WorkflowCallbackService.java new file mode 100644 index 0000000..9ce395f --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/WorkflowCallbackService.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.service.workflow; + +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * A service that calls the supplied callback when it is started -after the + * given delay. It can be configured to stop itself after the callback has + * completed, marking any exception raised as the exception of this service. + * The notifications come in on a callback thread -a thread that is only + * started in this service's start() operation. + */ +public class WorkflowCallbackService extends + AbstractWorkflowExecutorService { + protected static final Logger LOG = + LoggerFactory.getLogger(WorkflowCallbackService.class); + private final Callable callback; + private final int delay; + private final ServiceTerminatingCallable command; + /** + * This is the callback + */ + private Callable callable; + private ScheduledFuture scheduledFuture; + + /** + * Create an instance of the service + * @param name service name + * @param callback callback to invoke + * @param delay delay -or 0 for no delay + * @param terminate terminate this service after the callback? + */ + public WorkflowCallbackService(String name, + Callable callback, + int delay, + boolean terminate) { + super(name); + Preconditions.checkNotNull(callback, "Null callback argument"); + this.callback = callback; + this.delay = delay; + command = new ServiceTerminatingCallable( + terminate ? this : null, + callback); + } + + public ScheduledFuture getScheduledFuture() { + return scheduledFuture; + } + + @Override + protected void serviceStart() throws Exception { + LOG.debug("Notifying {} after a delay of {} millis", callback, delay); + ScheduledExecutorService executorService = + Executors.newSingleThreadScheduledExecutor( + new ServiceThreadFactory(getName(), true)); + setExecutor(executorService); + scheduledFuture = + executorService.schedule(command, delay, TimeUnit.MILLISECONDS); + } + + /** + * Stop the service. + * If there is any exception noted from any executed notification, + * note the exception in this class + * @throws Exception exception. + */ + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") + @Override + protected void serviceStop() throws Exception { + super.serviceStop(); + // propagate any failure + if (getCallbackException() != null) { + throw getCallbackException(); + } + } + + /** + * Get the exception raised by a callback. Will always be null if the + * callback has not been executed; will only be non-null after any success. + * @return a callback + */ + public Exception getCallbackException() { + return command.getException(); + } + +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/WorkflowClosingService.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/WorkflowClosingService.java new file mode 100644 index 0000000..e828dff --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/WorkflowClosingService.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.service.workflow; + +import org.apache.hadoop.service.AbstractService; + +import java.io.Closeable; +import java.io.IOException; + +/** + * Service that closes the closeable supplied during shutdown, if not null. + * + * As the Service interface itself extends Closeable, this service + * can be used to shut down other services if desired. + */ +public class WorkflowClosingService extends AbstractService { + + /** + * Entity to close + */ + private C closeable; + + /** + * Construct an instance of the service + * @param name service name + * @param closeable closeable to close (may be null) + */ + public WorkflowClosingService(String name, + C closeable) { + super(name); + this.closeable = closeable; + } + + /** + * Construct an instance of the service, using the default name + * @param closeable closeable to close (may be null) + */ + public WorkflowClosingService(C closeable) { + this("WorkflowClosingService", closeable); + } + + /** + * Get the closeable + * @return the closeable + */ + public synchronized C getCloseable() { + return closeable; + } + + /** + * Set or update the closeable. + * @param closeable + */ + public synchronized void setCloseable(C closeable) { + this.closeable = closeable; + } + + /** + * Stop routine will close the closeable -if not null - and set the + * reference to null afterwards + * This operation does raise any exception on the close, though it does + * record it + */ + @Override + protected void serviceStop() { + C target = getCloseable(); + if (target != null) { + try { + target.close(); + } catch (IOException ioe) { + noteFailure(ioe); + } + setCloseable(null); + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/WorkflowCompositeService.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/WorkflowCompositeService.java new file mode 100644 index 0000000..3d12648 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/WorkflowCompositeService.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.service.workflow; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.service.ServiceParent; +import org.apache.hadoop.service.ServiceStateChangeListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** + * An extended composite service which stops itself if any child service + * fails, or when all its children have successfully stopped without failure. + * + * Lifecycle + *
    + *
  1. If any child exits with a failure: this service stops, propagating + * the exception.
  2. + *
  3. When all child services has stopped, this service stops itself
  4. + *
+ * + */ +public class WorkflowCompositeService extends CompositeService + implements ServiceParent, ServiceStateChangeListener { + + private static final Logger LOG = + LoggerFactory.getLogger(WorkflowCompositeService.class); + + /** + * Construct an instance + * @param name name of this service instance + */ + public WorkflowCompositeService(String name) { + super(name); + } + + /** + * Construct an instance with the default name. + */ + public WorkflowCompositeService() { + this("WorkflowCompositeService"); + } + + /** + * Varargs constructor + * @param name name of this service instance + * @param children children + */ + public WorkflowCompositeService(String name, Service... children) { + this(name); + for (Service child : children) { + addService(child); + } + } + + /** + * Construct with a list of children + * @param name name of this service instance + * @param children children to add + */ + public WorkflowCompositeService(String name, List children) { + this(name); + for (Service child : children) { + addService(child); + } + } + + /** + * Add a service, and register it + * @param service the {@link Service} to be added. + */ + @Override + public synchronized void addService(Service service) { + Preconditions.checkArgument(service != null, "null service argument"); + service.registerServiceListener(this); + super.addService(service); + } + + /** + * When this service is started, any service stopping with a failure + * exception is converted immediately into a failure of this service, + * storing the failure and stopping ourselves. + * @param child the service that has changed. + */ + @Override + public void stateChanged(Service child) { + //if that child stopped while we are running: + if (isInState(STATE.STARTED) && child.isInState(STATE.STOPPED)) { + // a child service has stopped + //did the child fail? if so: propagate + Throwable failureCause = child.getFailureCause(); + if (failureCause != null) { + LOG.info("Child service {} failed: {}", child, failureCause.toString()); + //failure. Convert to an exception + Exception e = (failureCause instanceof Exception) ? + (Exception) failureCause : new Exception(failureCause); + //flip ourselves into the failed state + noteFailure(e); + stop(); + } else { + LOG.debug("Child service completed {}", child); + if (areAllChildrenStopped()) { + LOG.debug("All children are halted: stopping"); + stop(); + } + } + } + } + + /** + * Probe to query if all children are stopped -simply + * by taking a snapshot of the child service list and enumerating + * their state. + * The state of the children may change during this operation -that will + * not get picked up. + * @return true if all the children are stopped. + */ + private boolean areAllChildrenStopped() { + List children = getServices(); + boolean stopped = true; + for (Service child : children) { + if (!child.isInState(STATE.STOPPED)) { + stopped = false; + break; + } + } + return stopped; + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/WorkflowForkedProcessService.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/WorkflowForkedProcessService.java new file mode 100644 index 0000000..7fea0a9 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/WorkflowForkedProcessService.java @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.service.workflow; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.service.ServiceStateException; +import org.apache.hadoop.util.ExitUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Service wrapper for an external program that is launched and can/will terminate. + * This service is notified when the subprocess terminates, and stops itself + * and converts a non-zero exit code into a failure exception. + * + *

+ * Key Features: + *

    + *
  1. The property {@link #executionTimeout} can be set to set a limit + * on the duration of a process
  2. + *
  3. Output is streamed to the output logger provided
  4. . + *
  5. The most recent lines of output are saved to a linked list
  6. . + *
  7. A synchronous callback, {@link LongLivedProcessLifecycleEvent}, is raised on the start + * and finish of a process.
  8. + *
+ * + * Usage: + *

+ * The service can be built in the constructor, {@link #WorkflowForkedProcessService(String, Map, List)}, + * or have its simple constructor used to instantiate the service, then the + * {@link #build(Map, List)} command used to define the environment variables + * and list of commands to execute. One of these two options MUST be exercised + * before calling the services's {@link #start()} method. + *

+ * The forked process is executed in the service's {@link #serviceStart()} method; + * if still running when the service is stopped, {@link #serviceStop()} will + * attempt to stop it. + *

+ * + * The service delegates process execution to {@link LongLivedProcess}, + * receiving callbacks via the {@link LongLivedProcessLifecycleEvent}. + * When the service receives a callback notifying that the process has completed, + * it calls its {@link #stop()} method. If the error code was non-zero, + * the service is logged as having failed. + */ +public class WorkflowForkedProcessService + extends AbstractWorkflowExecutorService + implements LongLivedProcessLifecycleEvent, Runnable { + + /** + * Log for the forked master process + */ + private static final Logger LOG = + LoggerFactory.getLogger(WorkflowForkedProcessService.class); + public static final String ERROR_PROCESS_NOT_SET = "Process not yet configured"; + + private final AtomicBoolean processTerminated = new AtomicBoolean(false); + private boolean processStarted = false; + private LongLivedProcess process; + private int executionTimeout = -1; + private int timeoutCode = 1; + + /** + log to log to; defaults to this service log + */ + private Logger processLog = LOG; + + /** + * Exit code set when the spawned process exits + */ + private final AtomicInteger exitCode = new AtomicInteger(0); + + /** + * Create an instance of the service + * @param name a name + */ + public WorkflowForkedProcessService(String name) { + super(name); + } + + /** + * Create an instance of the service, set up the process + * @param name a name + * @param commandList list of commands is inserted on the front + * @param env environment variables above those generated by + * @throws IOException IO problems + */ + public WorkflowForkedProcessService(String name, + Map env, + List commandList) throws IOException { + super(name); + build(env, commandList); + } + + /** + * Start the service by starting the inner process. + * @throws ServiceStateException if no process has been set up to launch. + * @throws Exception + */ + @Override //AbstractService + protected void serviceStart() throws Exception { + if (process == null) { + throw new ServiceStateException(ERROR_PROCESS_NOT_SET); + } + //now spawn the process -expect updates via callbacks + process.start(); + } + + @Override //AbstractService + protected void serviceStop() throws Exception { + //tag as completed if not already; use the current exit code + completed(exitCode.get()); + stopForkedProcess(); + } + + /** + * Stopped the forked process + */ + private void stopForkedProcess() { + if (process != null) { + process.stop(); + } + } + + /** + * Set the process log. This may be null for "do not log" + * @param processLog process log + */ + public void setProcessLog(Logger processLog) { + this.processLog = processLog; + process.setProcessLog(processLog); + } + + /** + * Set the timeout by which time a process must have finished -or -1 for forever + * @param timeout timeout in milliseconds + */ + public void setTimeout(int timeout, int code) { + this.executionTimeout = timeout; + this.timeoutCode = code; + } + + /** + * Build the process to execute when the service is started + * @param commandList list of commands is inserted on the front + * @param env environment variables above those generated by + * @throws IOException IO problems + */ + public void build(Map env, + List commandList) + throws IOException { + Preconditions.checkState(process == null, "process already started"); + process = new LongLivedProcess(getName(), processLog, commandList); + process.setLifecycleCallback(this); + //set the env variable mapping + process.putEnvMap(env); + } + + @Override // notification from executed process + public synchronized void onProcessStarted(LongLivedProcess process) { + LOG.debug("Process has started"); + processStarted = true; + if (executionTimeout > 0) { + setExecutor(ServiceThreadFactory.singleThreadExecutor(getName(), true)); + execute(this); + } + } + + @Override // notification from executed process + public void onProcessExited(LongLivedProcess ps, + int uncorrected, + int code) { + try { + synchronized (this) { + completed(code); + //note whether or not the service had already stopped + LOG.debug("Process has exited with exit code {}", code); + if (code != 0) { + reportFailure(code, getName() + " failed with code " + code); + } + } + } finally { + stop(); + } + } + + /** + * Report a failure by building an {@link ExitUtil.ExitException} + * with the given exit code, then calling {@link #noteFailure(Exception)} + * to log it as this services failure exception. + * @param code exit code + * @param text error text + */ + private void reportFailure(int code, String text) { + //error + ExitUtil.ExitException execEx = new ExitUtil.ExitException(code, text); + LOG.debug("Noting failure", execEx); + noteFailure(execEx); + } + + /** + * handle timeout response by escalating it to a failure + */ + @Override + public void run() { + try { + synchronized (processTerminated) { + if (!processTerminated.get()) { + processTerminated.wait(executionTimeout); + } + } + + } catch (InterruptedException e) { + LOG.info("Interrupted"); + //assume signalled; exit + } + //check the status; if the marker isn't true, bail + if (!processTerminated.getAndSet(true)) { + LOG.info("process timeout: reporting error code {}", timeoutCode); + + //timeout + if (isInState(Service.STATE.STARTED)) { + //trigger a failure + stopForkedProcess(); + } + reportFailure(timeoutCode, + getName() + ": timeout after " + executionTimeout + + " millis: exit code =" + timeoutCode); + } + } + + /** + * Note the process as having completed. + * The exit code is stored, the process marked as terminated + * -and anything synchronized on processTerminated + * is notified + * @param code exit code + */ + protected void completed(int code) { + LOG.debug("Completed with exit code {}", code); + synchronized (processTerminated) { + // set the exit first, to guarantee that it will always be valid when processTerminated holds. + // only do this if the value is currently 0 + exitCode.compareAndSet(0, code); + processTerminated.set(true); + processTerminated.notify(); + } + } + + /** + * Is the process terminated? This is false until the process is started and then completes. + * @return true if the process has been executed to completion. + */ + public boolean isProcessTerminated() { + return processTerminated.get(); + } + + /** + * Has the process started? + * @return true if the process started. + */ + public synchronized boolean didProcessStart() { + return processStarted; + } + + /** + * Is a process running: between started and terminated + * @return true if the process is up. + */ + public synchronized boolean isProcessRunning() { + return processStarted && !isProcessTerminated(); + } + + + /** + * Get the process exit code. + *

+ * If the process has not yet completed, this will be zero. + * @return an exit code in the range -127 to +128 + */ + public int getExitCode() { + return exitCode.get(); + } + + /** + * Get the recent output from the process, or an empty if not defined + * @return a possibly empty list + */ + public List getRecentOutput() { + return process != null + ? process.getRecentOutput() + : new LinkedList(); + } + + /** + * Get the recent output from the process, or [] if not defined + * + * @param finalOutput flag to indicate "wait for the final output of the process" + * @param duration the duration, in ms, + * to wait for recent output to become non-empty + * @return a possibly empty list + */ + public List getRecentOutput(boolean finalOutput, int duration) { + if (process == null) { + return new LinkedList<>(); + } + return process.getRecentOutput(finalOutput, duration); + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder(super.toString()); + if (process != null) { + sb.append(", ").append(process.toString()); + } + sb.append(", processStarted=").append(processStarted); + sb.append(", processTerminated=").append(processTerminated); + sb.append(", exitCode=").append(exitCode); + return sb.toString(); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/WorkflowRpcService.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/WorkflowRpcService.java new file mode 100644 index 0000000..b1e22c1 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/WorkflowRpcService.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.service.workflow; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.service.AbstractService; + +import java.net.InetSocketAddress; + +/** + * A YARN service that maps the start/stop lifecycle of an RPC server + * to the YARN service lifecycle. + */ +public class WorkflowRpcService extends AbstractService { + + /** + * RPC server. + *

+ * Invariant: this is non-null. + */ + private final Server server; + + /** + * Construct an instance + * @param name service name + * @param server service to stop. It must not be null + * @throws IllegalArgumentException if server is null. + */ + public WorkflowRpcService(String name, Server server) { + super(name); + Preconditions.checkArgument(server != null, "Null server"); + this.server = server; + } + + /** + * Get the server + * @return the server + */ + public Server getServer() { + return server; + } + + /** + * Get the socket address of this server. + * @return the address this server is listening on + */ + public InetSocketAddress getConnectAddress() { + return NetUtils.getConnectAddress(server); + } + + @Override + protected void serviceStart() throws Exception { + super.serviceStart(); + server.start(); + } + + /** + * Stop the service. + * + * @throws Exception any failure. + */ + @Override + protected void serviceStop() throws Exception { + server.stop(); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/WorkflowSequenceService.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/WorkflowSequenceService.java new file mode 100644 index 0000000..2c7e43d --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/WorkflowSequenceService.java @@ -0,0 +1,313 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.service.workflow; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.service.ServiceParent; +import org.apache.hadoop.service.ServiceStateChangeListener; +import org.apache.hadoop.service.ServiceStateException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * This resembles the YARN CompositeService, except that it + * starts one service after another + *

+ * Workflow + *

    + *
  1. When the WorkflowSequenceService instance is + * initialized, it only initializes itself.
  2. + * + *
  3. When the WorkflowSequenceService instance is + * started, it initializes then starts the first of its children. + * If there are no children, it immediately stops.
  4. + * + *
  5. When the active child stops, it did not fail, and the parent has not + * stopped -then the next service is initialized and started. If there is no + * remaining child the parent service stops.
  6. + * + *
  7. If the active child did fail, the parent service notes the exception + * and stops -effectively propagating up the failure. + *
  8. + *
+ * + * New service instances MAY be added to a running instance -but no guarantees + * can be made as to whether or not they will be run. + */ + +public class WorkflowSequenceService extends AbstractService implements + ServiceParent, ServiceStateChangeListener { + + private static final Logger LOG = + LoggerFactory.getLogger(WorkflowSequenceService.class); + + /** + * list of services + */ + private final List serviceList = new ArrayList<>(); + + /** + * The currently active service. + * Volatile -may change & so should be read into a + * local variable before working with + */ + private volatile Service activeService; + + /** + * The previous service to the one (if any) that is running. + * null if none have finished yet; this implicitly + * holds before any service has been started. + */ + private volatile Service previousService; + + /** + * Construct an instance + * @param name service name + */ + public WorkflowSequenceService(String name) { + super(name); + } + + /** + * Construct an instance with the default name + */ + public WorkflowSequenceService() { + this("WorkflowSequenceService"); + } + + /** + * Create a service sequence with the given list of services + * @param name service name + * @param children initial sequence + */ + public WorkflowSequenceService(String name, Service... children) { + super(name); + for (Service service : children) { + addService(service); + } + } + + /** + * Create a service sequence with the given list of services + * @param name service name + * @param children initial sequence + */ + public WorkflowSequenceService(String name, List children) { + super(name); + for (Service service : children) { + addService(service); + } + } + + /** + * Get the current service -which may be null + * @return service running + */ + public Service getActiveService() { + return activeService; + } + + /** + * Get the previously active service + * @return the service last run, or null if there is none. + */ + public Service getPreviousService() { + return previousService; + } + + /** + * start the first service. If there are none to start, stop() this service. + * @throws Exception + */ + @Override + protected void serviceStart() throws Exception { + if (!startNextService()) { + //nothing to start -so stop + stop(); + } + } + + /** + * Stop the service. This stops any currently running child service. + * @throws Exception on any failure + */ + @Override + protected void serviceStop() throws Exception { + //stop current service. + //this triggers a callback that is caught and ignored + Service current = activeService; + previousService = current; + activeService = null; + if (current != null) { + current.stop(); + } + } + + /** + * Start the next service in the list. + * Return false if there are no more services to run, or this + * service has stopped + * @return true if a service was started + * @throws RuntimeException from any init or start failure + * @throws ServiceStateException if this call is made before + * the service is started + */ + public synchronized boolean startNextService() { + LOG.debug("Starting next child service"); + if (isInState(STATE.STOPPED)) { + //downgrade to a failed + LOG.debug("Not starting next service -{} is stopped", this); + return false; + } + if (!isInState(STATE.STARTED)) { + //reject attempts to start a service too early + throw new ServiceStateException( + "Cannot start a child service when not started"); + } + if (serviceList.isEmpty()) { + //nothing left to run + LOG.debug("No services left to start"); + return false; + } + if (activeService != null && activeService.getFailureCause() != null) { + //did the last service fail? Is this caused by some premature callback? + LOG.debug("Not starting next service due to a failure of {}", + activeService); + return false; + } + + //bear in mind that init & start can fail, which + //can trigger re-entrant calls into the state change listener. + //by setting the current service to null + //the start-next-service logic is skipped. + //now, what does that mean w.r.t exit states? + + activeService = null; + Service head = serviceList.remove(0); + LOG.debug("Starting {}", head); + try { + head.init(getConfig()); + head.registerServiceListener(this); + head.start(); + } catch (RuntimeException e) { + noteFailure(e); + throw e; + } + //at this point the service must have explicitly started & not failed, + //else an exception would have been raised + activeService = head; + + return true; + } + + /** + * State change event relays service stop events to + * {@link #onServiceCompleted(Service)}. Subclasses can + * extend that with extra logic + * @param service the service that has changed. + */ + @Override + public void stateChanged(Service service) { + // only react to the state change when it is the current service + // and it has entered the STOPPED state + if (service == activeService && service.isInState(STATE.STOPPED)) { + onServiceCompleted(service); + } + } + + /** + * handler for service completion: base class starts the next service + * @param service service that has completed + */ + protected synchronized void onServiceCompleted(Service service) { + LOG.info("Running service stopped: {}", service); + previousService = activeService; + + + //start the next service if we are not stopped ourselves + if (isInState(STATE.STARTED)) { + + //did the service fail? if so: propagate + Throwable failureCause = service.getFailureCause(); + if (failureCause != null) { + Exception e = (failureCause instanceof Exception) + ? (Exception) failureCause + : new Exception(failureCause); + noteFailure(e); + stop(); + } + + //start the next service + boolean started; + try { + started = startNextService(); + } catch (Exception e) { + //something went wrong here + noteFailure(e); + started = false; + } + if (!started) { + //no start because list is empty + //stop and expect the notification to go upstream + stop(); + } + } else { + //not started, so just note that the current service + //has gone away + activeService = null; + } + } + + /** + * Add the passed {@link Service} to the list of services managed by this + * {@link WorkflowSequenceService} + * @param service the {@link Service} to be added + */ + @Override + public synchronized void addService(Service service) { + Preconditions.checkArgument(service != null, "null service argument"); + LOG.debug("Adding service {} ", service.getName()); + synchronized (serviceList) { + serviceList.add(service); + } + } + + /** + * Get an unmodifiable list of services + * @return a list of child services at the time of invocation - + * added services will not be picked up. + */ + @Override //Parent + public synchronized List getServices() { + return Collections.unmodifiableList(serviceList); + } + + @Override // Object + public synchronized String toString() { + return super.toString() + "; current service " + activeService + + "; queued service count=" + serviceList.size(); + } + +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/package-info.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/package-info.java new file mode 100644 index 0000000..e34942e --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/package-info.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.service.workflow; + +/** + +

+This package contains classes which can be aggregated to build up +complex workflows of services: sequences of operations, callbacks +and composite services with a shared lifespan. + +

+Core concepts: +

+ + +

+The Workflow Services are set of Hadoop YARN services, all implementing +the {@link org.apache.hadoop.service.Service} API. +They are designed to be aggregated, to be composed to produce larger +composite services which than perform ordered operations, notify other services +when work has completed, and to propagate failure up the service hierarchy. + +

+Service instances may a limited lifespan, and may self-terminate when +they consider it appropriate.

+ +

+Workflow Services that have children implement the +{@link org.apache.hadoop.service.ServiceParent} +class, which provides (thread-safe) access to the children -allowing new children +to be added, and existing children to be ennumerated. The implement policies +on how to react to the termination of children -so can sequence operations +which terminate themselves when complete. + +

+Workflow Services may be subclassed to extend their behavior, or to use them +in specific applications. Just as the standard +{@link org.apache.hadoop.service.CompositeService} +is often subclassed to aggregate child services, the +{@link org.apache.hadoop.service.workflow.WorkflowCompositeService} +can be used instead -adding the feature that failing services trigger automatic +parent shutdown. If that is the desired operational mode of a class, +swapping the composite service implementation may be sufficient to adopt it. + + +

How do the workflow services differ from the YARN Composite Service?

+ + +There was originally one YARN service for managing children, the +{@link org.apache.hadoop.service.CompositeService}. +

+The {@link org.apache.hadoop.service.workflow.WorkflowCompositeService} +shares the same model of "child services, all inited and started together". +Where it differs is that if any child service stops -either due to a failure +or to an action which invokes that service's +{@link org.apache.hadoop.service.Service#stop()} method. +

+ +In contrast, the original {@link org.apache.hadoop.service.CompositeService} +class starts its children in its {@link org.apache.hadoop.service.Service#start()} +method, but does not listen or react to any child service halting. + +As a result, changes in child +state are not automatically detected or propagated, other than failures in +the actual init() and start() methods. + +

+ If a child service runs until completed; is it will not be stopped until + instructed to do so, and if it is only the parent service that attempts to + stop the child, then this difference is unimportant. +

+ + However, if any service that depends upon all it child services running - + and if those child services are written so as to stop when they fail, using + the WorkflowCompositeService as a base class will enable the + parent service to be automatically notified of a child stopping. + +

+The {@link org.apache.hadoop.service.workflow.WorkflowSequenceService} +resembles the composite service in API, but its workflow is different. It +initializes and starts its children one-by-one, only starting the second after +the first one succeeds, the third after the second, etc. If any service in +the sequence fails, the parent WorkflowSequenceService stops, +reporting the same exception. + +

+The {@link org.apache.hadoop.service.workflow.WorkflowForkedProcessService}: +Executes a process when started, and binds to the life of that process. When the +process terminates, so does the service -and vice versa. This service enables +external processes to be executed as part of a sequence of operations -or, +using the {@link org.apache.hadoop.service.workflow.WorkflowCompositeService} +in parallel with other services, terminating the process when the other services +stop -and vice versa. + +

+The {@link org.apache.hadoop.service.workflow.WorkflowCallbackService} +executes a {@link java.util.concurrent.Callable} callback a specified delay +after the service is started, then potentially terminates itself. +This is useful for callbacks when a workflow reaches a specific point +-or simply for executing arbitrary code in the workflow. + + +

+Other Workflow Services +

+ +There are some minor services that have proven useful within aggregate workflows, +and simply in applications which are built from composite YARN services. + +
    +
  • + {@link org.apache.hadoop.service.workflow.WorkflowRpcService }: + Maintains a reference to an RPC {@link org.apache.hadoop.ipc.Server} instance. + When the service is started, so is the RPC server. Similarly, when the service + is stopped, so is the RPC server instance. +
  • + +
  • + {@link org.apache.hadoop.service.workflow.WorkflowClosingService}: Closes + an instance of {@link java.io.Closeable} when the service is stopped. This + is purely a housekeeping class. +
  • + +
+ +

Lower-level classes

+ +
    +
  • + {@link org.apache.hadoop.service.workflow.ServiceTerminatingRunnable }: + A {@link java.lang.Runnable} which runs the runnable supplied in its constructor + then signals its owning service to stop once that runnable is completed. + Any exception raised in the run is stored. +
  • + +
  • + {@link org.apache.hadoop.service.workflow.AbstractWorkflowExecutorService}: + A base class for services that wish to have a {@link java.util.concurrent.ExecutorService} + with a lifespan mapped to that of a service. When the service is stopped, the + {@link java.util.concurrent.ExecutorService#shutdownNow()} method is called to + attempt to shut down all running tasks. +
  • + +
  • + {@link org.apache.hadoop.service.workflow.ServiceThreadFactory}: + This is a simple {@link java.util.concurrent.ThreadFactory} which generates + meaningful thread names. It can be used as a parameter to constructors of + {@link java.util.concurrent.ExecutorService} instances, to ensure that + log information can tie back text to the related services
  • +
+ + + */ diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/EndOfServiceWaiter.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/EndOfServiceWaiter.java new file mode 100644 index 0000000..8416faa --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/EndOfServiceWaiter.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.service.workflow; + +import org.apache.hadoop.service.Service; +import org.apache.hadoop.service.ServiceStateChangeListener; +import org.junit.Assert; + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * a {@link ServiceStateChangeListener} that waits for a service to stop; + */ +public class EndOfServiceWaiter implements ServiceStateChangeListener { + + private final AtomicBoolean finished = new AtomicBoolean(false); + + public EndOfServiceWaiter(Service svc) { + svc.registerServiceListener(this); + } + + /** + * Wait for a service to stop. Raises an assertion if the + * service does not stop in the time period. + * @param timeout time to wait in millis before failing. + * @throws InterruptedException + */ + public synchronized void waitForServiceToStop(long timeout) throws + InterruptedException { + if (!finished.get()) { + wait(timeout); + } + Assert.assertTrue("Service did not finish in time period", + finished.get()); + } + + @Override + public synchronized void stateChanged(Service service) { + if (service.isInState(Service.STATE.STOPPED)) { + finished.set(true); + notify(); + } + } + + +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/MockService.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/MockService.java new file mode 100644 index 0000000..823340c --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/MockService.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.service.workflow; + +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.service.ServiceStateException; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Mock service which can be set to fail at different points + */ +public class MockService extends AbstractService { + private final boolean fail; + private final int lifespan; + private final ExecutorService executorService = + Executors.newSingleThreadExecutor(); + + /** + * Create with an infinite lifespan and no failures + */ + MockService() { + this("mock", false, -1); + } + + /** + * Create with a name, optional failure option and a lifespan in millis + * @param name + * @param fail + * @param lifespan + */ + MockService(String name, boolean fail, int lifespan) { + super(name); + this.fail = fail; + this.lifespan = lifespan; + } + + /** + * Start the service. This will start an asynchronous thread (in an executor service) + * which will sleep for the given time period. + * @throws Exception on any failure + */ + @Override + protected void serviceStart() throws Exception { + //act on the lifespan here + if (lifespan > 0) { + executorService.submit(new Runnable() { + @Override + public void run() { + try { + Thread.sleep(lifespan); + } catch (InterruptedException ignored) { + + } + finish(); + } + }); + } else { + if (lifespan == 0) { + finish(); + } else { + //continue until told not to + } + } + } + + /** + * Finish the service + */ + private void finish() { + if (fail) { + ServiceStateException e = + new ServiceStateException(getName() + " failed"); + + noteFailure(e); + stop(); + throw e; + } else { + stop(); + } + } + +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/ParentWorkflowTestBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/ParentWorkflowTestBase.java new file mode 100644 index 0000000..499db22 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/ParentWorkflowTestBase.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.service.workflow; + +import org.apache.hadoop.service.Service; +import org.apache.hadoop.service.ServiceParent; + +/** + * Extends {@link WorkflowServiceTestBase} with parent-specific operations + * and logic to build up and run the parent service + */ +public abstract class ParentWorkflowTestBase extends WorkflowServiceTestBase { + + /** + * Wait a second for the service parent to stop + * @param parent the service to wait for + */ + protected void waitForParentToStop(ServiceParent parent) { + waitForParentToStop(parent, 1000); + } + + /** + * Wait for the service parent to stop + * @param parent the service to wait for + * @param timeout time in milliseconds + */ + protected void waitForParentToStop(ServiceParent parent, int timeout) { + boolean stop = parent.waitForServiceToStop(timeout); + if (!stop) { + logState(parent); + fail("Service failed to stop : after " + timeout + " millis " + parent); + } + } + + /** + * Subclasses are require to implement this and return an instance of a + * ServiceParent + * @param services a possibly empty list of services + * @return an inited -but -not-started- service parent instance + */ + protected abstract ServiceParent buildService(Service... services); + + /** + * Use {@link #buildService(Service...)} to create service and then start it + * @param services + * @return + */ + protected ServiceParent startService(Service... services) { + ServiceParent parent = buildService(services); + //expect service to start and stay started + parent.start(); + return parent; + } + +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/ProcessCommandFactory.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/ProcessCommandFactory.java new file mode 100644 index 0000000..5e5dc19 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/ProcessCommandFactory.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.service.workflow; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; + +/** + * A source of commands, with the goal being to allow for adding different + * implementations for different platforms + */ +public class ProcessCommandFactory { + + protected ProcessCommandFactory() { + } + + /** + * The command to list a directory + * @param dir directory + * @return commands + */ + public List ls(File dir) { + List commands = new ArrayList<>(5); + commands.add("ls"); + commands.add("-1"); + commands.add(dir.getAbsolutePath()); + return commands; + } + + /** + * Echo some text to stdout + * @param text text + * @return commands + */ + public List echo(String text) { + List commands = new ArrayList<>(5); + commands.add("echo"); + commands.add(text); + return commands; + } + + /** + * print env variables + * @return commands + */ + public List env() { + List commands = new ArrayList<>(1); + commands.add("env"); + return commands; + } + + /** + * execute a command that returns with an error code that will + * be converted into a number + * @return commands + */ + public List exitFalse() { + List commands = new ArrayList<>(2); + commands.add("false"); + return commands; + } + + /** + * Create a process command factory for this OS + * @return + */ + public static ProcessCommandFactory createProcessCommandFactory() { + return new ProcessCommandFactory(); + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/SimpleRunnable.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/SimpleRunnable.java new file mode 100644 index 0000000..d948c68 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/SimpleRunnable.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.service.workflow; + +/** + * Test runnable that can be made to exit, or throw an exception + * during its run + */ +class SimpleRunnable implements Runnable { + boolean throwException = false; + + /** + * Create an instance + */ + SimpleRunnable() { + } + + /** + * Create an instance + * @param throwException throw an exception in the run operation + */ + SimpleRunnable(boolean throwException) { + this.throwException = throwException; + } + + @Override + public synchronized void run() { + try { + if (throwException) { + throw new RuntimeException("SimpleRunnable"); + } + } finally { + this.notify(); + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/TestLongLivedProcess.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/TestLongLivedProcess.java new file mode 100644 index 0000000..a323ca5 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/TestLongLivedProcess.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.service.workflow; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.List; + +/** + * Test the {@link LongLivedProcess} class + */ +public class TestLongLivedProcess extends WorkflowServiceTestBase implements + LongLivedProcessLifecycleEvent { + private static final Logger + LOG = LoggerFactory.getLogger(TestLongLivedProcess.class); + + private LongLivedProcess process; + private File testDir = new File("target"); + private ProcessCommandFactory commandFactory; + private volatile boolean started, stopped; + + @Before + public void setupProcesses() { + commandFactory = ProcessCommandFactory.createProcessCommandFactory(); + } + + @After + public void stopProcesses() { + if (process != null) { + process.stop(); + } + } + + @Test + public void testLs() throws Throwable { + + initProcess(commandFactory.ls(testDir)); + process.start(); + //in-thread wait + process.run(); + + //here stopped + assertTrue("process start callback not received", started); + assertTrue("process stop callback not received", stopped); + assertFalse(process.isRunning()); + assertEquals(0, process.getExitCode().intValue()); + + assertStringInOutput("test-classes", getFinalOutput()); + } + + @Test + public void testExitCodes() throws Throwable { + + initProcess(commandFactory.exitFalse()); + process.start(); + //in-thread wait + process.run(); + + //here stopped + + assertFalse(process.isRunning()); + int exitCode = process.getExitCode(); + assertTrue(exitCode != 0); + int corrected = process.getExitCodeSignCorrected(); + + assertEquals(1, corrected); + } + + @Test + public void testEcho() throws Throwable { + + String echoText = "hello, world"; + initProcess(commandFactory.echo(echoText)); + process.start(); + //in-thread wait + process.run(); + + //here stopped + assertTrue("process stop callback not received", stopped); + assertEquals(0, process.getExitCode().intValue()); + assertStringInOutput(echoText, getFinalOutput()); + } + + @Test + public void testSetenv() throws Throwable { + + String var = "TEST_RUN"; + String val = "TEST-RUN-ENV-VALUE"; + initProcess(commandFactory.env()); + process.setEnv(var, val); + process.start(); + //in-thread wait + process.run(); + + //here stopped + assertTrue("process stop callback not received", stopped); + assertEquals(0, process.getExitCode().intValue()); + assertStringInOutput(val, getFinalOutput()); + } + + /** + * Get the final output. includes a quick sleep for the tail output + * @return the last output + */ + private List getFinalOutput() { + return process.getRecentOutput(); + } + + private LongLivedProcess initProcess(List commands) { + process = new LongLivedProcess(name.getMethodName(), LOG, commands); + process.setLifecycleCallback(this); + return process; + } + + /** + * Handler for callback events on the process + */ + + @Override + public void onProcessStarted(LongLivedProcess process) { + started = true; + } + + /** + * Handler for callback events on the process + */ + @Override + public void onProcessExited(LongLivedProcess process, + int exitCode, + int signCorrectedCode) { + stopped = true; + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/TestWorkflowClosingService.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/TestWorkflowClosingService.java new file mode 100644 index 0000000..630e352 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/TestWorkflowClosingService.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.service.workflow; + +import org.apache.hadoop.conf.Configuration; +import org.junit.Test; + +import java.io.Closeable; +import java.io.IOException; + +/** + * Test the {@link WorkflowClosingService} + */ +public class TestWorkflowClosingService extends WorkflowServiceTestBase { + + @Test + public void testSimpleClose() throws Throwable { + WorkflowClosingService svc = instance(false); + OpenClose openClose = svc.getCloseable(); + assertFalse(openClose.closed); + svc.stop(); + assertTrue(openClose.closed); + } + + @Test + public void testNullClose() throws Throwable { + WorkflowClosingService + svc = new WorkflowClosingService<>(null); + svc.init(new Configuration()); + svc.start(); + assertNull(svc.getCloseable()); + svc.stop(); + } + + @Test + public void testFailingClose() throws Throwable { + WorkflowClosingService svc = instance(false); + OpenClose openClose = svc.getCloseable(); + openClose.raiseExceptionOnClose = true; + svc.stop(); + assertTrue(openClose.closed); + Throwable cause = svc.getFailureCause(); + assertNotNull(cause); + + //retry should be a no-op + svc.close(); + } + + @Test + public void testDoubleClose() throws Throwable { + WorkflowClosingService svc = instance(false); + OpenClose openClose = svc.getCloseable(); + openClose.raiseExceptionOnClose = true; + svc.stop(); + assertTrue(openClose.closed); + Throwable cause = svc.getFailureCause(); + assertNotNull(cause); + openClose.closed = false; + svc.stop(); + assertEquals(cause, svc.getFailureCause()); + } + + /** + * This does not recurse forever, as the service has already entered the + * STOPPED state before the inner close tries to stop it -that operation + * is a no-op + * @throws Throwable + */ + @Test + public void testCloseSelf() throws Throwable { + WorkflowClosingService svc = + new WorkflowClosingService<>(null); + svc.setCloseable(svc); + svc.stop(); + } + + + private WorkflowClosingService instance(boolean raiseExceptionOnClose) { + WorkflowClosingService + svc = new WorkflowClosingService(new OpenClose( + raiseExceptionOnClose)); + svc.init(new Configuration()); + svc.start(); + return svc; + } + + private static class OpenClose implements Closeable { + public boolean closed = false; + public boolean raiseExceptionOnClose; + + private OpenClose(boolean raiseExceptionOnClose) { + this.raiseExceptionOnClose = raiseExceptionOnClose; + } + + @Override + public void close() throws IOException { + if (!closed) { + closed = true; + if (raiseExceptionOnClose) { + throw new IOException("OpenClose"); + } + } + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/TestWorkflowCompositeService.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/TestWorkflowCompositeService.java new file mode 100644 index 0000000..352f6d9 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/TestWorkflowCompositeService.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.service.workflow; + + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.service.ServiceParent; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test the {@link org.apache.hadoop.service.workflow.WorkflowCompositeService} + */ +public class TestWorkflowCompositeService extends ParentWorkflowTestBase { + private static final Logger + log = LoggerFactory.getLogger(TestWorkflowCompositeService.class); + + @Test + public void testSingleChild() throws Throwable { + Service parent = startService(new MockService()); + parent.stop(); + } + + @Test + public void testSingleChildTerminating() throws Throwable { + ServiceParent parent = + startService(new MockService("1", false, 100)); + waitForParentToStop(parent); + } + + @Test + public void testSingleChildFailing() throws Throwable { + ServiceParent parent = + startService(new MockService("1", true, 100)); + waitForParentToStop(parent); + assert parent.getFailureCause() != null; + } + + @Test + public void testTwoChildren() throws Throwable { + MockService one = new MockService("one", false, 100); + MockService two = new MockService("two", false, 100); + ServiceParent parent = startService(one, two); + waitForParentToStop(parent); + assertStopped(one); + assertStopped(two); + } + + @Test + public void testCallableChild() throws Throwable { + + MockService one = new MockService("one", false, 100); + CallableHandler handler = new CallableHandler("hello"); + WorkflowCallbackService ens = + new WorkflowCallbackService("handler", handler, 100, true); + MockService two = new MockService("two", false, 100); + ServiceParent parent = startService(one, ens, two); + waitForParentToStop(parent); + assertStopped(one); + assertStopped(ens); + assertStopped(two); + assertTrue(handler.notified); + String s = ens.getScheduledFuture().get(); + assertEquals("hello", s); + } + + @Test + public void testNestedComposite() throws Throwable { + MockService one = new MockService("one", false, 100); + MockService two = new MockService("two", false, 100); + ServiceParent parent = buildService(one, two); + ServiceParent outer = startService(parent); + assertTrue(outer.waitForServiceToStop(1000)); + assertStopped(one); + assertStopped(two); + } + + @Test + public void testFailingComposite() throws Throwable { + MockService one = new MockService("one", true, 10); + MockService two = new MockService("two", false, 1000); + ServiceParent parent = startService(one, two); + waitForParentToStop(parent); + assertStopped(one); + assertStopped(two); + assertNotNull(one.getFailureCause()); + assertNotNull(parent.getFailureCause()); + assertEquals(one.getFailureCause(), parent.getFailureCause()); + } + + @Override + public ServiceParent buildService(Service... services) { + ServiceParent parent = + new WorkflowCompositeService("test", services); + parent.init(new Configuration()); + return parent; + } + +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/TestWorkflowExecutorService.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/TestWorkflowExecutorService.java new file mode 100644 index 0000000..5b3fcea --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/TestWorkflowExecutorService.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.service.workflow; + +import org.junit.Test; + +/** + * Test the {@link AbstractWorkflowExecutorService} + */ +public class TestWorkflowExecutorService extends WorkflowServiceTestBase { + + @Test + public void testAsyncRun() throws Throwable { + + ExecutorSvc svc = run(new ExecutorSvc()); + ServiceTerminatingRunnable runnable = + new ServiceTerminatingRunnable(svc, new SimpleRunnable()); + + // synchronous in-thread execution + svc.execute(runnable); + Thread.sleep(1000); + assertStopped(svc); + } + + @Test + public void testFailureRun() throws Throwable { + + ExecutorSvc svc = run(new ExecutorSvc()); + ServiceTerminatingRunnable runnable = + new ServiceTerminatingRunnable(svc, new SimpleRunnable(true)); + + // synchronous in-thread execution + svc.execute(runnable); + Thread.sleep(1000); + assertStopped(svc); + assertNotNull(runnable.getException()); + } + + private static class ExecutorSvc extends AbstractWorkflowExecutorService { + private ExecutorSvc() { + super("ExecutorService", + ServiceThreadFactory.singleThreadExecutor("test", true)); + } + + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/TestWorkflowForkedProcessService.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/TestWorkflowForkedProcessService.java new file mode 100644 index 0000000..59d4721 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/TestWorkflowForkedProcessService.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.service.workflow; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.ServiceOperations; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Test the {@link WorkflowForkedProcessService} + */ +public class TestWorkflowForkedProcessService extends WorkflowServiceTestBase { + + private WorkflowForkedProcessService process; + private File testDir = new File("target"); + private ProcessCommandFactory commandFactory; + private Map env = new HashMap<>(); + + @Before + public void setupProcesses() { + commandFactory = ProcessCommandFactory.createProcessCommandFactory(); + } + + @After + public void stopProcesses() { + ServiceOperations.stop(process); + } + + @Test + public void testLs() throws Throwable { + skipOnWindows(); + initProcess(commandFactory.ls(testDir)); + assertExecCompletes(0); + // assert that the service did not fail + assertNull(process.getFailureCause()); + } + + @Test + public void testExitCodes() throws Throwable { + skipOnWindows(); + initProcess(commandFactory.exitFalse()); + assertExecCompletes(1); + // assert that the exit code was uprated to a service failure + assertNotNull("process failure cause is null", + process.getFailureCause()); + } + + @Test + public void testEcho() throws Throwable { + skipOnWindows(); + String echoText = "hello, world"; + initProcess(commandFactory.echo(echoText)); + assertExecCompletes(0); + assertStringInOutput(echoText, getFinalOutput()); + } + + protected void assertExecCompletes(int expected) throws InterruptedException { + exec(); + assertFalse("process is still running", process.isProcessRunning()); + int exitCode = process.getExitCode(); + assertEquals("process exit code is zero: " + process.toString(), + expected, exitCode); + } + + @Test + public void testSetenv() throws Throwable { + String var = "TEST_RUN"; + String val = "TEST-RUN-ENV-VALUE"; + env.put(var, val); + initProcess(commandFactory.env()); + assertExecCompletes(0); + assertStringInOutput(val, getFinalOutput()); + } + + /** + * Get the final output. includes a quick sleep for the tail output + * @return the last output + */ + private List getFinalOutput() { + return process.getRecentOutput(); + } + + private WorkflowForkedProcessService initProcess(List commands) + throws IOException { + process = new WorkflowForkedProcessService(name.getMethodName(), env, + commands); + process.init(new Configuration()); + return process; + } + + public void exec() throws InterruptedException { + assertNotNull(process); + EndOfServiceWaiter waiter = new EndOfServiceWaiter(process); + process.start(); + waiter.waitForServiceToStop(5000); + } + +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/TestWorkflowRpcService.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/TestWorkflowRpcService.java new file mode 100644 index 0000000..f00a62f --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/TestWorkflowRpcService.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.service.workflow; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.Server; +import org.junit.Test; + +import java.io.IOException; +import java.net.InetSocketAddress; + +/** + * Test the {@link WorkflowRpcService} + */ +public class TestWorkflowRpcService extends WorkflowServiceTestBase { + + @Test + public void testCreateMockRPCService() throws Throwable { + MockRPC rpc = new MockRPC(); + rpc.start(); + assertTrue(rpc.started); + rpc.getListenerAddress(); + rpc.stop(); + assertTrue(rpc.stopped); + } + + @Test + public void testLifecycle() throws Throwable { + MockRPC rpc = new MockRPC(); + WorkflowRpcService svc = new WorkflowRpcService("test", rpc); + run(svc); + assertTrue(rpc.started); + svc.getConnectAddress(); + svc.stop(); + assertTrue(rpc.stopped); + } + + @Test + public void testStartFailure() throws Throwable { + MockRPC rpc = new MockRPC(); + rpc.failOnStart = true; + WorkflowRpcService svc = new WorkflowRpcService("test", rpc); + svc.init(new Configuration()); + try { + svc.start(); + fail("expected an exception"); + } catch (RuntimeException e) { + assertEquals("failOnStart", e.getMessage()); + } + svc.stop(); + assertTrue(rpc.stopped); + } + + /** + * Mock RPC server; can be set to fail on startup + */ + private static class MockRPC extends Server { + + public boolean stopped; + public boolean started; + /** + * Flag to indicate that the server should fail when started + */ + public boolean failOnStart; + + private MockRPC() throws IOException { + super("localhost", 0, null, 1, new Configuration()); + } + + @Override + public synchronized void start() { + if (failOnStart) { + throw new RuntimeException("failOnStart"); + } + started = true; + super.start(); + } + + @Override + public synchronized void stop() { + stopped = true; + super.stop(); + } + + @Override + public synchronized InetSocketAddress getListenerAddress() { + return super.getListenerAddress(); + } + + @Override + public Writable call(RPC.RpcKind rpcKind, + String protocol, + Writable param, + long receiveTime) throws Exception { + return null; + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/TestWorkflowSequenceService.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/TestWorkflowSequenceService.java new file mode 100644 index 0000000..4fa21eb --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/TestWorkflowSequenceService.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.service.workflow; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.service.ServiceParent; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test the {@link WorkflowSequenceService} + */ +public class TestWorkflowSequenceService extends ParentWorkflowTestBase { + private static final Logger + log = LoggerFactory.getLogger(TestWorkflowSequenceService.class); + + @Test + public void testSingleSequence() throws Throwable { + ServiceParent parent = startService(new MockService()); + parent.stop(); + } + + @Test + public void testEmptySequence() throws Throwable { + ServiceParent parent = startService(); + waitForParentToStop(parent); + } + + @Test + public void testSequence() throws Throwable { + MockService one = new MockService("one", false, 100); + MockService two = new MockService("two", false, 100); + ServiceParent parent = startService(one, two); + waitForParentToStop(parent); + assertStopped(one); + assertStopped(two); + assert ((WorkflowSequenceService) parent).getPreviousService().equals(two); + } + + @Test + public void testCallableChild() throws Throwable { + + MockService one = new MockService("one", false, 100); + CallableHandler handler = new CallableHandler("hello"); + WorkflowCallbackService ens = + new WorkflowCallbackService("handler", handler, 100, true); + MockService two = new MockService("two", false, 100); + ServiceParent parent = startService(one, ens, two); + waitForParentToStop(parent); + assertStopped(one); + assertStopped(ens); + assertStopped(two); + assertTrue(handler.notified); + String s = ens.getScheduledFuture().get(); + assertEquals("hello", s); + } + + + @Test + public void testFailingSequence() throws Throwable { + MockService one = new MockService("one", true, 100); + MockService two = new MockService("two", false, 100); + WorkflowSequenceService parent = + (WorkflowSequenceService) startService(one, two); + waitForParentToStop(parent); + assertStopped(one); + assertInState(two, Service.STATE.NOTINITED); + assertEquals(one, parent.getPreviousService()); + } + + @Test + public void testFailInStartNext() throws Throwable { + MockService one = new MockService("one", false, 100); + MockService two = new MockService("two", true, 0); + MockService three = new MockService("3", false, 0); + ServiceParent parent = startService(one, two, three); + waitForParentToStop(parent); + assertStopped(one); + assertStopped(two); + Throwable failureCause = two.getFailureCause(); + assertNotNull(failureCause); + Throwable parentFailureCause = parent.getFailureCause(); + assertNotNull(parentFailureCause); + assertEquals(parentFailureCause, failureCause); + assertInState(three, Service.STATE.NOTINITED); + } + + @Test + public void testSequenceInSequence() throws Throwable { + MockService one = new MockService("one", false, 100); + MockService two = new MockService("two", false, 100); + ServiceParent parent = buildService(one, two); + ServiceParent outer = startService(parent); + waitForParentToStop(parent); + assertStopped(one); + assertStopped(two); + } + + @Test + public void testVarargsConstructor() throws Throwable { + MockService one = new MockService("one", false, 100); + MockService two = new MockService("two", false, 100); + ServiceParent parent = new WorkflowSequenceService("test", one, two); + parent.init(new Configuration()); + parent.start(); + waitForParentToStop(parent); + assertStopped(one); + assertStopped(two); + } + + @Test + public void testAddChild() throws Throwable { + MockService one = new MockService("one", false, 5000); + MockService two = new MockService("two", false, 100); + ServiceParent parent = startService(one, two); + CallableHandler handler = new CallableHandler("hello"); + WorkflowCallbackService ens = + new WorkflowCallbackService("handler", handler, 100, true); + parent.addService(ens); + waitForParentToStop(parent, 10000); + assertStopped(one); + assertStopped(two); + assertStopped(ens); + assertStopped(two); + assertEquals("hello", ens.getScheduledFuture().get()); + } + + public WorkflowSequenceService buildService(Service... services) { + WorkflowSequenceService parent = + new WorkflowSequenceService("test", services); + parent.init(new Configuration()); + return parent; + } + +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/TestWorkflowServiceTerminatingRunnable.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/TestWorkflowServiceTerminatingRunnable.java new file mode 100644 index 0000000..a66ef53 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/TestWorkflowServiceTerminatingRunnable.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.service.workflow; + +import org.junit.Test; + +/** + * Test the {@link TestWorkflowServiceTerminatingRunnable} + */ +public class TestWorkflowServiceTerminatingRunnable extends WorkflowServiceTestBase { + + @Test(expected = IllegalArgumentException.class) + public void testNoservice() throws Throwable { + new ServiceTerminatingRunnable(null, new SimpleRunnable()); + } + + @Test + public void testBasicRun() throws Throwable { + + WorkflowCompositeService svc = run(new WorkflowCompositeService()); + ServiceTerminatingRunnable runnable = new ServiceTerminatingRunnable(svc, + new SimpleRunnable()); + + // synchronous in-thread execution + runnable.run(); + assertStopped(svc); + } + + @Test + public void testFailureRun() throws Throwable { + + WorkflowCompositeService svc = run(new WorkflowCompositeService()); + ServiceTerminatingRunnable runnable = + new ServiceTerminatingRunnable(svc, new SimpleRunnable(true)); + + // synchronous in-thread execution + runnable.run(); + assertStopped(svc); + assertNotNull(runnable.getException()); + } + +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/WorkflowServiceTestBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/WorkflowServiceTestBase.java new file mode 100644 index 0000000..3f0fa32 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/WorkflowServiceTestBase.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.service.workflow; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.service.ServiceParent; +import org.apache.hadoop.util.Shell; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Rule; +import org.junit.rules.TestName; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.concurrent.Callable; + +/** + * Test base for workflow service tests. + */ +public abstract class WorkflowServiceTestBase extends Assert { + private static final Logger + LOG = LoggerFactory.getLogger(WorkflowServiceTestBase.class); + + /** + * Set the timeout for every test + */ + @Rule + public Timeout testTimeout = new Timeout(15000); + + @Rule + public TestName name = new TestName(); + + @Before + public void nameThread() { + Thread.currentThread().setName("JUnit"); + } + + /** + * Assert that a service is in a state + * @param service service + * @param expected expected state + */ + protected void assertInState(Service service, Service.STATE expected) { + Service.STATE actual = service.getServiceState(); + if (actual != expected) { + fail("Service " + service.getName() + " in state " + actual + + " -expected " + expected); + } + } + + /** + * assert that a service has stopped. + * @param service service to check + */ + protected void assertStopped(Service service) { + assertInState(service, Service.STATE.STOPPED); + } + + /** + * Log the state of a service parent, and that of all its children + * @param parent + */ + protected void logState(ServiceParent parent) { + logService(parent); + for (Service s : parent.getServices()) { + logService(s); + } + } + + /** + * Log details about a service, including any failure cause. + * @param service service to log + */ + protected void logService(Service service) { + LOG.info(service.toString()); + Throwable failureCause = service.getFailureCause(); + if (failureCause != null) { + LOG.info("Failed in state {} with {}", service.getFailureState(), + failureCause, failureCause); + } + } + + /** + * Init and start a service + * @param svc the service + * @return the service + */ + protected S run(S svc) { + svc.init(new Configuration()); + svc.start(); + return svc; + } + + /** + * Handler for callable events + */ + public static class CallableHandler implements Callable { + public volatile boolean notified = false; + public final String result; + + public CallableHandler(String result) { + this.result = result; + } + + @Override + public String call() throws Exception { + LOG.info("CallableHandler::call"); + notified = true; + return result; + } + } + + /** + * Assert that a string is in an output list. Fails fast if the output + * list is empty + * @param text text to scan for + * @param output list of output lines. + */ + public void assertStringInOutput(String text, List output) { + assertTrue("Empty output list", !output.isEmpty()); + boolean found = false; + StringBuilder builder = new StringBuilder(); + for (String s : output) { + builder.append(s).append('\n'); + if (s.contains(text)) { + found = true; + break; + } + } + + if (!found) { + String message = + "Text \"" + text + "\" not found in " + output.size() + " lines\n"; + fail(message + builder); + } + } + + + /** + * skip a test on windows + */ + public static void skipOnWindows() { + if (Shell.WINDOWS) { + skip("Not supported on windows"); + } + } + + public static void skip(String message) { + LOG.warn("Skipping test: {}", message); + Assume.assumeTrue(message, false); + } +}