diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/IrqHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/IrqHandler.java new file mode 100644 index 0000000..f7ea66f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/IrqHandler.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service; + +import sun.misc.Signal; +import sun.misc.SignalHandler; + +import java.io.IOException; + +/** + * This class bundles up all the compiler warnings about abuse of sun.misc + * interrupt handling code + * into one place. + */ +public final class IrqHandler implements SignalHandler { + + public static final String CONTROL_C = "INT"; + + private final String name; + private final Interrupted handler; + + /** + * Create an IRQ handler bound to the specific interrupt + * @param name signal name + * @param handler handler + * @throws IOException + */ + public IrqHandler(String name, Interrupted handler) throws IOException { + this.handler = handler; + this.name = name; + try { + Signal.handle(new Signal(name), this); + } catch (IllegalArgumentException e) { + throw new IOException( + "Could not set handler for signal \"" + name + "\"." + + "This can happen if the JVM has the -Xrs set.", + e); + } + } + + @Override + public String toString() { + return "IrqHandler for signal " + name ; + } + + /** + * Handler for the JVM API for signal handling + * @param signal signal raised + */ + @Override + public void handle(Signal signal) { + InterruptData data = new InterruptData(signal.getName(), signal.getNumber()); + handler.interrupted(data); + } + + /** + * Interrupt data to pass on. + */ + public static class InterruptData { + public final String name; + public final int number; + + public InterruptData(String name, int number) { + this.name = name; + this.number = number; + } + + @Override + public String toString() { + return "signal " + name + '(' + number + ')'; + } + } + + /** + * Callback on interruption + */ + public interface Interrupted { + + /** + * Handle an interrupt + * @param interruptData data + */ + void interrupted(InterruptData interruptData); + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/ServiceLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/ServiceLauncher.java new file mode 100644 index 0000000..bf6d148 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/ServiceLauncher.java @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.util.ExitUtil; +import org.apache.hadoop.util.ShutdownHookManager; +import org.apache.hadoop.util.VersionInfo; +import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; + +import java.io.IOException; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * A class to launch any service by name + */ +public class ServiceLauncher + implements IrqHandler.Interrupted { + private static final Log LOG = LogFactory.getLog(AbstractService.class); + protected static final int PRIORITY = 30; + + /** + * Exit code when an exception was thrown from the service: {@value} + */ + public static final int EXIT_EXCEPTION_THROWN = 1; + + /** + * Exit code when a usage message was printed: {@value} + */ + public static final int EXIT_USAGE = 2; + + /** + * Exit code when a control-C, kill -3, signal was picked up: {@value} + */ + public static final int EXIT_INTERRUPTED = 3; + + private volatile Service service; + private final List interruptHandlers = new ArrayList(1); + private Configuration configuration; + private String serviceClassName; + + /** + * Create an instance of the launcher + * @param serviceClassName classname of the service + */ + public ServiceLauncher(String serviceClassName) { + this.serviceClassName = serviceClassName; + } + + /** + * Launch the service. All exceptions that occur are propagated upwards. + * + * @param conf configuration + * @throws ClassNotFoundException classname not on the classpath + * @throws IllegalAccessException not allowed at the class + * @throws InstantiationException not allowed to instantiate it + * @throws InterruptedException thread interrupted + * @throws IOException any IO exception + */ + public void launchService(Configuration conf) + throws ClassNotFoundException, IllegalAccessException, + InstantiationException, InterruptedException, + IOException { + + configuration = conf; + + //Instantiate the class -this requires the service to have a public zero-argument constructor + Class serviceClass = this.getClass().getClassLoader().loadClass(serviceClassName); + Object instance = serviceClass.newInstance(); + if (!(instance instanceof Service)) { + //not a service + throw new ServiceStateException("Not a Service: " + serviceClassName); + } + + service = (Service) instance; + + //Register the interrupt handlers + interruptHandlers.add(new IrqHandler(IrqHandler.CONTROL_C, this)); + //and the shutdown hook + ServiceOperations.ServiceShutdownHook shutdownHook = + new ServiceOperations.ServiceShutdownHook(service); + ShutdownHookManager.get().addShutdownHook( + shutdownHook, PRIORITY); + service.init(configuration); + service.start(); + service.waitForServiceToStop(0); + } + + + /** + * The service has been interrupted. Again, trigger something resembling an elegant shutdown; + * give the service 30s to do this before the exit operation is called + * @param interruptData the interrupted flag. + */ + @Override + public void interrupted(IrqHandler.InterruptData interruptData) { + boolean controlC = IrqHandler.CONTROL_C.equals(interruptData.name); + int shutdownTimeMillis = 30 * 1000; + //start an async shutdown thread with a timeout + boolean serviceStopped; + + + ServiceForcedShutdown forcedShutdown = + new ServiceForcedShutdown(shutdownTimeMillis); + Thread thread = new Thread(forcedShutdown); + thread.start(); + //wait for that thread to finish + try { + thread.join(shutdownTimeMillis); + } catch (InterruptedException e) { + //ignored + } + if (!forcedShutdown.isServiceStopped()) { + LOG.warn("Service did not shut down in time"); + } + int exitCode = controlC ? 0 : EXIT_INTERRUPTED; + ExitUtil.terminate(exitCode); + } + + /** + * forced shutdown runnable. + */ + private class ServiceForcedShutdown implements Runnable { + + private final int shutdownTimeMillis; + private boolean serviceStopped; + + public ServiceForcedShutdown(int shutdownTimeoutMillis) { + this.shutdownTimeMillis = shutdownTimeoutMillis; + } + + @Override + public void run() { + if (service != null) { + service.stop(); + serviceStopped = service.waitForServiceToStop(shutdownTimeMillis); + } else { + serviceStopped = true; + } + } + + private boolean isServiceStopped() { + return serviceStopped; + } + } + + /** + * Get the service name via {@link Service#getName()}. + * If the service is not instantiated, the classname is returned instead. + * @return the service name + */ + public String getServiceName() { + Service s = service; + if (s != null) { + return "service " + s.getName(); + } else { + return "service classname " + serviceClassName; + } + } + + /** + * Launch a service catching all excpetions and downgrading them to exit codes + * @param out output stream for printing errors to (alongside the logging back + * end) + * @param configuration configuration to use + * @return an exit code. + */ + public int launchServiceRobustly(PrintStream out, Configuration configuration) { + try { + launchService(configuration); + if (service != null) { + Throwable failure = service.getFailureCause(); + if (failure != null) { + Service.STATE failureState = service.getFailureState(); + if (failureState == Service.STATE.STOPPED) { + //the failure occurred during shutdown, not important enough to bother + //the user as it may just scare them + LOG.debug("Failure during shutdown", failure); + } else { + throw failure; + } + } + } + //either the service succeeded, or an error was only raised during shutdown, + //which we don't worry that much about + return 0; + } catch (Throwable thrown) { + LOG.error("While running " + getServiceName(), thrown); + out.println("While running " + getServiceName() + + ": " + thrown); + return EXIT_EXCEPTION_THROWN; + } + } + + /** + * Print a log message for starting up and shutting down. + * This was grabbed from the ToolRunner code. + * @param classname the class of the server + * @param args arguments + * @param log the target log object + */ + public static void startupShutdownMessage(String classname, + String[] args, + Log log) { + final String hostname = NetUtils.getHostname(); + log.info( + toStartupShutdownString("STARTUP_MSG: ", new String[]{ + "Starting " + classname, + " host = " + hostname, + " args = " + Arrays.asList(args), + " version = " + VersionInfo.getVersion(), + " classpath = " + System.getProperty("java.class.path"), + " build = " + VersionInfo.getUrl() + " -r " + + VersionInfo.getRevision() + + "; compiled by '" + VersionInfo.getUser() + + "' on " + VersionInfo.getDate(), + " java = " + System.getProperty("java.version")} + )); + } + + private static String toStartupShutdownString(String prefix, String[] msg) { + StringBuilder b = new StringBuilder(prefix); + b.append("\n/************************************************************"); + for (String s : msg) { + b.append("\n").append(prefix).append(s); + } + b.append("\n************************************************************/"); + return b.toString(); + } + + public static void main(String[] args) { + if (args.length < 1) { + System.err.println("Usage: ServiceLauncher classname "); + ExitUtil.terminate(EXIT_USAGE); + } + String serviceClassName = args[0]; + + startupShutdownMessage(serviceClassName, args, LOG); + Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); + ServiceLauncher serviceLauncher = new ServiceLauncher(serviceClassName); + //Currently the config just the default + Configuration conf = new Configuration(); + serviceLauncher.launchServiceRobustly(System.err, conf); + } + +}