commit 1dccb7301f61f2087a694299da68f94fdbe48d60 Author: Todd Lipcon Date: Wed Apr 15 17:12:19 2009 -0700 HADOOP-5640: Add dispatch mechanism for services to notify plugins of important events. diff --git src/core/org/apache/hadoop/util/PluginDispatcher.java src/core/org/apache/hadoop/util/PluginDispatcher.java new file mode 100644 index 0000000..8dbe171 --- /dev/null +++ src/core/org/apache/hadoop/util/PluginDispatcher.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util; + +import org.apache.commons.logging.*; + +import org.apache.hadoop.conf.Configuration; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; + +/** + * Provides convenience functions for dispatching calls through + * to plugins registered with a class. Classes that wish to provide + * plugin interfaces should use this class to load the plugin list + * from the Configuration and to dispatch calls to the loaded instances. + * + * Calls dispatched through this class are performed on a second thread + * so as to not block execution of the plugged service. + */ +public class PluginDispatcher { + public static final Log LOG = LogFactory.getLog(PluginDispatcher.class.getName()); + + private final List plugins; + private Executor executor; + + /** + * Load a PluginDispatcher from the given Configuration. The start() + * callback will not be automatically called. + * + * @param conf the Configuration from which to load + * @param key the configuration key that lists class names to instantiate + * @param clazz the class or interface from which plugins must extend + */ + public static PluginDispatcher createFromConfiguration( + Configuration conf, String key, Class clazz) { + List plugins = conf.getInstances(key, clazz); + return new PluginDispatcher(plugins); + } + + PluginDispatcher(Collection plugins) { + this.plugins = Collections.synchronizedList(new ArrayList(plugins)); + executor = Executors.newSingleThreadExecutor(); + } + + PluginDispatcher(Collection plugins, Executor executor) { + this.plugins = Collections.synchronizedList(new ArrayList(plugins)); + this.executor = executor; + } + + /** + * Dispatch a call to all active plugins. + * + * Exceptions will be caught and logged at WARN level. + * + * @param callback a function which will run once for each plugin, with + * that plugin as the argument + */ + public void dispatchCall(final SingleArgumentRunnable callback) { + executor.execute(new Runnable() { + public void run() { + for (T plugin : plugins) { + try { + callback.run(plugin); + } catch (Throwable t) { + LOG.warn("Uncaught exception dispatching to plugin " + plugin, t); + } + } + }}); + } + + /** + * Dispatches the start(...) hook common to all ServicePlugins. This + * also automatically removes any plugin that throws an exception while + * attempting to start. + * + * @param plugPoint passed to ServicePlugin.start() + */ + public void dispatchStart(final Object plugPoint) { + dispatchCall( + new SingleArgumentRunnable() { + public void run(T p) { + try { + p.start(plugPoint); + } catch (Throwable t) { + LOG.error("ServicePlugin " + p + " could not be started. " + + "Removing from future callbacks.", t); + plugins.remove(p); + } + } + }); + } + + /** + * Convenience function for dispatching the stop() hook common to all + * ServicePlugins. + */ + public void dispatchStop() { + dispatchCall( + new SingleArgumentRunnable() { + public void run(T p) { + try { + p.stop(); + } catch (Throwable t) { + LOG.warn("ServicePlugin " + p + " could not be stopped", t); + } + } + }); + } +} diff --git src/core/org/apache/hadoop/util/ServicePlugin.java src/core/org/apache/hadoop/util/ServicePlugin.java index a83294e..c2543be 100644 --- src/core/org/apache/hadoop/util/ServicePlugin.java +++ src/core/org/apache/hadoop/util/ServicePlugin.java @@ -35,6 +35,9 @@ public interface ServicePlugin extends Closeable { /** * This method is invoked when the service instance has been started. * + * If the plugin fails to initialize and throws an exception, the + * PluginDispatcher instance will log an error and remove the plugin. + * * @param service The service instance invoking this method */ void start(Object service); diff --git src/core/org/apache/hadoop/util/SingleArgumentRunnable.java src/core/org/apache/hadoop/util/SingleArgumentRunnable.java new file mode 100644 index 0000000..0c5e445 --- /dev/null +++ src/core/org/apache/hadoop/util/SingleArgumentRunnable.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.util; + +/** + * Simple interface for a Runnable that takes a single argument. + */ +public interface SingleArgumentRunnable { + public void run(T arg); +} \ No newline at end of file diff --git src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 19125d4..2111dc8 100644 --- src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -93,8 +93,10 @@ import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.DiskChecker; -import org.apache.hadoop.util.ServicePlugin; +import org.apache.hadoop.util.PluginDispatcher; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.ServicePlugin; +import org.apache.hadoop.util.SingleArgumentRunnable; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; @@ -203,7 +205,7 @@ public class DataNode extends Configured public Daemon blockScannerThread = null; /** Activated plug-ins. */ - private List plugins; + private PluginDispatcher pluginDispatcher; private static final Random R = new Random(); @@ -403,15 +405,9 @@ public class DataNode extends Configured LOG.info("dnRegistration = " + dnRegistration); - plugins = conf.getInstances("dfs.datanode.plugins", ServicePlugin.class); - for (ServicePlugin p: plugins) { - try { - p.start(this); - LOG.info("Started plug-in " + p); - } catch (Throwable t) { - LOG.warn("ServicePlugin " + p + " could not be started", t); - } - } + pluginDispatcher = PluginDispatcher.createFromConfiguration( + conf, "dfs.datanode.plugins", DatanodePlugin.class); + pluginDispatcher.dispatchStart(this); } /** @@ -556,6 +552,7 @@ public class DataNode extends Configured } catch (InterruptedException ie) {} } } + assert ("".equals(storage.getStorageID()) && !"".equals(dnRegistration.getStorageID())) || storage.getStorageID().equals(dnRegistration.getStorageID()) : @@ -604,16 +601,10 @@ public class DataNode extends Configured * Otherwise, deadlock might occur. */ public void shutdown() { - if (plugins != null) { - for (ServicePlugin p : plugins) { - try { - p.stop(); - LOG.info("Stopped plug-in " + p); - } catch (Throwable t) { - LOG.warn("ServicePlugin " + p + " could not be stopped", t); - } - } + if (pluginDispatcher != null) { + pluginDispatcher.dispatchStop(); } + if (infoServer != null) { try { @@ -868,6 +859,10 @@ public class DataNode extends Configured LOG.info("DatanodeCommand action: DNA_REGISTER"); if (shouldRun) { register(); + pluginDispatcher.dispatchCall( + new SingleArgumentRunnable() { + public void run(DatanodePlugin p) { p.reregistrationComplete(); } + }); } break; case DatanodeProtocol.DNA_FINALIZE: @@ -1287,6 +1282,10 @@ public class DataNode extends Configured if (dn != null) { //register datanode dn.register(); + dn.pluginDispatcher.dispatchCall( + new SingleArgumentRunnable() { + public void run(DatanodePlugin p) { p.initialRegistrationComplete(); } + }); dn.dataNodeThread = new Thread(dn, dnThreadName); dn.dataNodeThread.setDaemon(true); // needed for JUnit testing dn.dataNodeThread.start(); diff --git src/hdfs/org/apache/hadoop/hdfs/server/datanode/DatanodePlugin.java src/hdfs/org/apache/hadoop/hdfs/server/datanode/DatanodePlugin.java new file mode 100644 index 0000000..feebd70 --- /dev/null +++ src/hdfs/org/apache/hadoop/hdfs/server/datanode/DatanodePlugin.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hadoop.hdfs.server.datanode; + +import org.apache.hadoop.util.ServicePlugin; + +public abstract class DatanodePlugin implements ServicePlugin { + + // ServicePlugin hooks + + /** {@inheritDoc} */ + public abstract void start(Object obj); + + /** {@inheritDoc} */ + public abstract void stop(); + + // Datanode specific hooks + + /** + * The DataNode has registered itself with the NameNode during the initial + * startup sequence. This is called before the DataNode Thread has started + * running. + */ + public void initialRegistrationComplete() {} + + /** + * The DataNode has re-registered itself with the NameNode in response to a + * DNA_REGISTER event. This occurs when the NN has lost and regained contact + * with the DataNode. + */ + public void reregistrationComplete() {} +} \ No newline at end of file diff --git src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java index e36fffa..5e5423b 100644 --- src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java +++ src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java @@ -75,6 +75,7 @@ import org.apache.hadoop.security.authorize.ConfiguredPolicy; import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol; import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; +import org.apache.hadoop.util.PluginDispatcher; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ServicePlugin; import org.apache.hadoop.util.StringUtils; @@ -158,8 +159,9 @@ public class NameNode implements ClientProtocol, DatanodeProtocol, protected NamenodeRegistration nodeRegistration; /** Is service level authorization enabled? */ private boolean serviceAuthEnabled = false; + /** Activated plug-ins. */ - private List plugins; + private PluginDispatcher pluginDispatcher; /** Format a new filesystem. Destroys any filesystem that may already * exist at this location. **/ @@ -292,15 +294,10 @@ public class NameNode implements ClientProtocol, DatanodeProtocol, startHttpServer(conf); server.start(); //start RPC server startTrashEmptier(conf); - - plugins = conf.getInstances("dfs.namenode.plugins", ServicePlugin.class); - for (ServicePlugin p: plugins) { - try { - p.start(this); - } catch (Throwable t) { - LOG.warn("ServicePlugin " + p + " could not be started", t); - } - } + + pluginDispatcher = PluginDispatcher.createFromConfiguration( + conf, "dfs.namenode.plugins", NamenodePlugin.class); + pluginDispatcher.dispatchStart(this); } private void startTrashEmptier(Configuration conf) throws IOException { @@ -410,14 +407,8 @@ public class NameNode implements ClientProtocol, DatanodeProtocol, if (stopRequested) return; stopRequested = true; - if (plugins != null) { - for (ServicePlugin p : plugins) { - try { - p.stop(); - } catch (Throwable t) { - LOG.warn("ServicePlugin " + p + " could not be stopped", t); - } - } + if (pluginDispatcher != null) { + pluginDispatcher.dispatchStop(); } try { if (httpServer != null) httpServer.stop(); diff --git src/hdfs/org/apache/hadoop/hdfs/server/namenode/NamenodePlugin.java src/hdfs/org/apache/hadoop/hdfs/server/namenode/NamenodePlugin.java new file mode 100644 index 0000000..f6c6d87 --- /dev/null +++ src/hdfs/org/apache/hadoop/hdfs/server/namenode/NamenodePlugin.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import org.apache.hadoop.util.ServicePlugin; + +public abstract class NamenodePlugin implements ServicePlugin { + // ServicePlugin hooks + + /** {@inheritDoc} */ + public abstract void start(Object obj); + + /** {@inheritDoc} */ + public abstract void stop(); +} \ No newline at end of file diff --git src/test/core/org/apache/hadoop/util/TestPluginDispatcher.java src/test/core/org/apache/hadoop/util/TestPluginDispatcher.java new file mode 100644 index 0000000..474c9af --- /dev/null +++ src/test/core/org/apache/hadoop/util/TestPluginDispatcher.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util; + + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.Executor; + +import junit.framework.TestCase; + +public class TestPluginDispatcher extends TestCase { + + /** + * Ensure that dispatch works in general + */ + public void testDispatch() { + AtomicInteger runCount = new AtomicInteger(0); + + List plugins = Arrays.asList( + new TestPlugin[] { + new TestPlugin(runCount) + }); + + PluginDispatcher dispatcher = + new PluginDispatcher(plugins, new SameThreadExecutor()); + + dispatcher.dispatchCall(new RunMethodRunner()); + assertEquals(runCount.get(), 1); + } + + /** + * Ensure that, if a plugin is faulty during startup, it is removed + * from the plugin dispatcher. + */ + public void testRemovalOnStartError() { + AtomicInteger runCount = new AtomicInteger(0); + + List plugins = Arrays.asList( + new TestPlugin[] { + new TestPlugin(runCount), + new FaultyPlugin(runCount), + new TestPlugin(runCount) + }); + + PluginDispatcher dispatcher = + new PluginDispatcher(plugins, new SameThreadExecutor()); + + // Before we start the plugins, we can dispatch a call + // and it goes to all 3 plugins + dispatcher.dispatchCall(new RunMethodRunner()); + assertEquals(runCount.get(), 3); + + // When we dispatch the start, it should kill the faulty plugin + runCount.set(0); + dispatcher.dispatchStart(this); + dispatcher.dispatchCall(new RunMethodRunner()); + assertEquals(runCount.get(), 2); + } + + /** + * SingleArgumentRunnable that just calls plugin.run() + */ + static class RunMethodRunner implements SingleArgumentRunnable { + public void run(TestPlugin p) { + p.run(); + } + } + + /** + * Plugin which increments a counter when its run method is called. + */ + public static class TestPlugin implements ServicePlugin { + final AtomicInteger ai; + + public TestPlugin(AtomicInteger ai) { + this.ai = ai; + } + public void start(Object service) {} + public void stop() {} + public void close() {} + public void run() { + ai.getAndIncrement(); + } + } + + /** + * Plugin which throws a RuntimeException on start. + */ + public static class FaultyPlugin extends TestPlugin { + public FaultyPlugin(AtomicInteger ai) { + super(ai); + } + public void start(Object service) { + throw new RuntimeException("Kaboom!"); + } + } + + /** + * Executor which runs Runnables in the same thread that submits them. + */ + public static class SameThreadExecutor implements Executor { + public void execute(Runnable r) { + r.run(); + } + } +} \ No newline at end of file