commit 76157380e097c7726b7ca05f4a561d6b5c836078 Author: Todd Lipcon Date: Wed Jul 1 13:15:44 2009 -0700 HADOOP-5640. Allow ServicePlugins to hook callbacks into key service events diff --git src/java/org/apache/hadoop/util/ServicePlugin.java src/java/org/apache/hadoop/util/ServicePlugin.java index a83294e..ed51ae3 100644 --- src/java/org/apache/hadoop/util/ServicePlugin.java +++ src/java/org/apache/hadoop/util/ServicePlugin.java @@ -35,6 +35,9 @@ public interface ServicePlugin extends Closeable { /** * This method is invoked when the service instance has been started. * + * If the plugin fails to initialize and throws an exception, the + * ServicePluginDispatcher instance will log an error and remove the plugin. + * * @param service The service instance invoking this method */ void start(Object service); diff --git src/java/org/apache/hadoop/util/ServicePluginDispatcher.java src/java/org/apache/hadoop/util/ServicePluginDispatcher.java new file mode 100644 index 0000000..ca1c758 --- /dev/null +++ src/java/org/apache/hadoop/util/ServicePluginDispatcher.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util; + +import org.apache.commons.logging.*; + +import org.apache.hadoop.conf.Configuration; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; + +/** + * Provides convenience functions for dispatching calls through + * to plugins registered with a class. Classes that wish to provide + * plugin interfaces should use this class to load the plugin list + * from the Configuration and to dispatch calls to the loaded instances. + * + * Calls dispatched through this class are performed on a second thread + * so as to not block execution of the plugged service. + */ +public class ServicePluginDispatcher { + public static final Log LOG = LogFactory.getLog( + ServicePluginDispatcher.class.getName()); + + private final List plugins; + private Executor executor; + + /** + * Load a ServicePluginDispatcher from the given Configuration. The start() + * callback will not be automatically called. + * + * @param conf the Configuration from which to load + * @param key the configuration key that lists class names to instantiate + * @param clazz the class or interface from which plugins must extend + */ + public static + ServicePluginDispatcher createFromConfiguration( + Configuration conf, String key, Class clazz) { + List plugins = conf.getInstances(key, clazz); + return new ServicePluginDispatcher(plugins); + } + + ServicePluginDispatcher(Collection plugins) { + this.plugins = Collections.synchronizedList(new ArrayList(plugins)); + executor = Executors.newSingleThreadExecutor(); + } + + ServicePluginDispatcher(Collection plugins, Executor executor) { + this.plugins = Collections.synchronizedList(new ArrayList(plugins)); + this.executor = executor; + } + + /** + * Dispatch a call to all active plugins. + * + * Exceptions will be caught and logged at WARN level. + * + * @param callback a function which will run once for each plugin, with + * that plugin as the argument + */ + public void dispatchCall(final SingleArgumentRunnable callback) { + executor.execute(new Runnable() { + public void run() { + for (T plugin : plugins) { + try { + callback.run(plugin); + } catch (Throwable t) { + LOG.warn("Uncaught exception dispatching to plugin " + plugin, t); + } + } + }}); + } + + /** + * Dispatches the start(...) hook common to all ServicePlugins. This + * also automatically removes any plugin that throws an exception while + * attempting to start. + * + * @param plugPoint passed to ServicePlugin.start() + */ + public void dispatchStart(final Object plugPoint) { + dispatchCall( + new SingleArgumentRunnable() { + public void run(T p) { + try { + p.start(plugPoint); + } catch (Throwable t) { + LOG.error("ServicePlugin " + p + " could not be started. " + + "Removing from future callbacks.", t); + plugins.remove(p); + } + } + }); + } + + /** + * Convenience function for dispatching the stop() hook common to all + * ServicePlugins. + */ + public void dispatchStop() { + dispatchCall( + new SingleArgumentRunnable() { + public void run(T p) { + try { + p.stop(); + } catch (Throwable t) { + LOG.warn("ServicePlugin " + p + " could not be stopped", t); + } + } + }); + } +} diff --git src/java/org/apache/hadoop/util/SingleArgumentRunnable.java src/java/org/apache/hadoop/util/SingleArgumentRunnable.java new file mode 100644 index 0000000..0c5e445 --- /dev/null +++ src/java/org/apache/hadoop/util/SingleArgumentRunnable.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.util; + +/** + * Simple interface for a Runnable that takes a single argument. + */ +public interface SingleArgumentRunnable { + public void run(T arg); +} \ No newline at end of file diff --git src/test/core/org/apache/hadoop/util/TestServicePluginDispatcher.java src/test/core/org/apache/hadoop/util/TestServicePluginDispatcher.java new file mode 100644 index 0000000..eeefd68 --- /dev/null +++ src/test/core/org/apache/hadoop/util/TestServicePluginDispatcher.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util; + + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.Executor; + +import junit.framework.TestCase; + +public class TestServicePluginDispatcher extends TestCase { + + /** + * Ensure that dispatch works in general + */ + public void testDispatch() { + AtomicInteger runCount = new AtomicInteger(0); + + List plugins = Arrays.asList( + new TestPlugin[] { + new TestPlugin(runCount) + }); + + ServicePluginDispatcher dispatcher = + new ServicePluginDispatcher(plugins, new SameThreadExecutor()); + + dispatcher.dispatchCall(new RunMethodRunner()); + assertEquals(runCount.get(), 1); + } + + /** + * Ensure that, if a plugin is faulty during startup, it is removed + * from the plugin dispatcher. + */ + public void testRemovalOnStartError() { + AtomicInteger runCount = new AtomicInteger(0); + + List plugins = Arrays.asList( + new TestPlugin[] { + new TestPlugin(runCount), + new FaultyPlugin(runCount), + new TestPlugin(runCount) + }); + + ServicePluginDispatcher dispatcher = + new ServicePluginDispatcher(plugins, new SameThreadExecutor()); + + // Before we start the plugins, we can dispatch a call + // and it goes to all 3 plugins + dispatcher.dispatchCall(new RunMethodRunner()); + assertEquals(runCount.get(), 3); + + // When we dispatch the start, it should kill the faulty plugin + runCount.set(0); + dispatcher.dispatchStart(this); + dispatcher.dispatchCall(new RunMethodRunner()); + assertEquals(runCount.get(), 2); + } + + /** + * SingleArgumentRunnable that just calls plugin.run() + */ + static class RunMethodRunner implements SingleArgumentRunnable { + public void run(TestPlugin p) { + p.run(); + } + } + + /** + * Plugin which increments a counter when its run method is called. + */ + public static class TestPlugin implements ServicePlugin { + final AtomicInteger ai; + + public TestPlugin(AtomicInteger ai) { + this.ai = ai; + } + public void start(Object service) {} + public void stop() {} + public void close() {} + public void run() { + ai.getAndIncrement(); + } + } + + /** + * Plugin which throws a RuntimeException on start. + */ + public static class FaultyPlugin extends TestPlugin { + public FaultyPlugin(AtomicInteger ai) { + super(ai); + } + public void start(Object service) { + throw new RuntimeException("Kaboom!"); + } + } + + /** + * Executor which runs Runnables in the same thread that submits them. + */ + public static class SameThreadExecutor implements Executor { + public void execute(Runnable r) { + r.run(); + } + } +}