diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorClassLoader.java hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorClassLoader.java index 7d0ba92..ab01e4a 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorClassLoader.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorClassLoader.java @@ -65,9 +65,13 @@ public class CoprocessorClassLoader extends URLClassLoader { "org.w3c", "org.xml", "sunw.", - // Hadoop/HBase: - "org.apache.hadoop", + // logging + "org.apache.commons.logging", + "org.apache.log4j", "com.hadoop", + // Hadoop/HBase/ZK: + "org.apache.hadoop", + "org.apache.zookeeper", }; /** @@ -80,7 +84,12 @@ public class CoprocessorClassLoader extends URLClassLoader { new Pattern[] { Pattern.compile("^[^-]+-default\\.xml$") }; - + + /** + * Parent classloader used to load any class not matching the exemption list. + */ + private ClassLoader parent; + /** * Creates a CoprocessorClassLoader that loads classes from the given paths. * @param paths paths from which to load classes. @@ -88,8 +97,12 @@ public class CoprocessorClassLoader extends URLClassLoader { */ public CoprocessorClassLoader(List paths, ClassLoader parent) { super(paths.toArray(new URL[]{}), parent); + this.parent = parent; + if (parent == null) { + throw new IllegalArgumentException("No parent classloader!"); + } } - + @Override synchronized public Class loadClass(String name) throws ClassNotFoundException { @@ -99,9 +112,9 @@ public class CoprocessorClassLoader extends URLClassLoader { LOG.debug("Skipping exempt class " + name + " - delegating directly to parent"); } - return super.loadClass(name); + return parent.loadClass(name); } - + // Check whether the class has already been loaded: Class clasz = findLoadedClass(name); if (clasz != null) { @@ -123,7 +136,7 @@ public class CoprocessorClassLoader extends URLClassLoader { LOG.debug("Class " + name + " not found - delegating to parent"); } try { - clasz = super.loadClass(name); + clasz = parent.loadClass(name); } catch (ClassNotFoundException e2) { // Class not found in this ClassLoader or in the parent ClassLoader // Log some debug output before rethrowing ClassNotFoundException diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java index 49d4c0f..f1d9a26 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.coprocessor; +import com.google.common.collect.MapMaker; import com.google.protobuf.Service; import com.google.protobuf.ServiceException; import org.apache.commons.logging.Log; @@ -48,6 +49,7 @@ import java.io.FileOutputStream; import java.io.IOException; import java.net.URL; import java.util.*; +import java.util.concurrent.ConcurrentMap; import java.util.jar.JarEntry; import java.util.jar.JarFile; @@ -78,6 +80,11 @@ public abstract class CoprocessorHost { protected String pathPrefix; protected volatile int loadSequence; + // classloader caching + protected Set activeCoprocessorClassLoaders = new HashSet(); + static ConcurrentMap classLoadersCache = new MapMaker() + .concurrencyLevel(3).weakValues().makeMap(); + public CoprocessorHost() { pathPrefix = UUID.randomUUID().toString(); } @@ -150,6 +157,19 @@ public abstract class CoprocessorHost { coprocessors.addAll(configured); } + // clears classLoadersCache. Used by unit tests + static void clearCacheForTesting() { + classLoadersCache.clear(); + } + /* + * @param path + * @return the number of class loaders corresponding to path + */ + static int getClassloaderCountForTesting(Path path) { + ClassLoader cl = classLoadersCache.get(path); + return cl == null ? 0 : 1; + } + /** * Load a coprocessor implementation into the host * @param path path to implementation jar @@ -165,14 +185,27 @@ public abstract class CoprocessorHost { LOG.debug("Loading coprocessor class " + className + " with path " + path + " and priority " + priority); - // Have we already loaded the class, perhaps from an earlier region open - // for the same table? - try { - implClass = getClass().getClassLoader().loadClass(className); - } catch (ClassNotFoundException e) { - LOG.info("Class " + className + " needs to be loaded from a file - " + - path + "."); - // go ahead to load from file system. + ClassLoader cl = null; + if (path == null) { + try { + implClass = getClass().getClassLoader().loadClass(className); + } catch (ClassNotFoundException e) { + throw new IOException("No jar path specified for " + className); + } + } else { + // Have we already loaded the class, perhaps from an earlier region open + // for the same table? + cl = classLoadersCache.get(path); + if (cl != null){ + LOG.debug("Found classloader "+ cl + "for "+path.toString()); + try { + implClass = cl.loadClass(className); + } catch (ClassNotFoundException e) { + LOG.info("Class " + className + " needs to be loaded from a file - " + + path + "."); + // go ahead to load from file system. + } + } } // If not, load @@ -204,7 +237,8 @@ public abstract class CoprocessorHost { // unsurprisingly wants URLs, not URIs; so we will use the deprecated // method which returns URLs for as long as it is available List paths = new ArrayList(); - paths.add(new File(dst.toString()).getCanonicalFile().toURL()); + URL url = new File(dst.toString()).getCanonicalFile().toURL(); + paths.add(url); JarFile jarFile = new JarFile(dst.toString()); Enumeration entries = jarFile.entries(); @@ -221,17 +255,39 @@ public abstract class CoprocessorHost { } jarFile.close(); - ClassLoader cl = new CoprocessorClassLoader(paths, - this.getClass().getClassLoader()); - Thread.currentThread().setContextClassLoader(cl); + cl = new CoprocessorClassLoader(paths, this.getClass().getClassLoader()); + ClassLoader prev = classLoadersCache.putIfAbsent(path, cl); + if (prev != null) { + //lost update race, use already added classloader + cl = prev; + } + try { implClass = cl.loadClass(className); + // cache cp classloader as a weak value, will be GC'ed when no reference left + classLoadersCache.put (path, cl); } catch (ClassNotFoundException e) { + classLoadersCache.remove(path); throw new IOException(e); } + } - return loadInstance(implClass, priority, conf); + //load custom code for coprocessor + Thread currentThread = Thread.currentThread(); + ClassLoader hostClassLoader = currentThread.getContextClassLoader(); + try{ + // switch temporarily to the thread classloader for custom CP + currentThread.setContextClassLoader(cl); + E cpInstance = loadInstance(implClass, priority, conf); + // keep a reference to cached classloader at this instance level + // when no CPH refer the weak value of classloader it will be GC'ed + activeCoprocessorClassLoaders.add(cl); + return cpInstance; + } finally { + // restore the fresh (host) classloader + currentThread.setContextClassLoader(hostClassLoader); + } } /** diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java index 5069da6..55a7147 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java @@ -342,6 +342,7 @@ public class RegionCoprocessorHost } shutdown(env); } + activeCoprocessorClassLoaders.clear(); } /** diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestClassLoading.java hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestClassLoading.java index dd7fabc..26da050 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestClassLoading.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestClassLoading.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.RegionLoad; import javax.tools.*; import java.io.*; import java.util.*; +import java.util.concurrent.ConcurrentMap; import java.util.jar.*; import org.junit.*; @@ -210,16 +211,18 @@ public class TestClassLoading { new Path(fs.getUri().toString() + Path.SEPARATOR)); String jarFileOnHDFS1 = fs.getUri().toString() + Path.SEPARATOR + jarFile1.getName(); + Path pathOnHDFS1 = new Path(jarFileOnHDFS1); assertTrue("Copy jar file to HDFS failed.", - fs.exists(new Path(jarFileOnHDFS1))); + fs.exists(pathOnHDFS1)); LOG.info("Copied jar file to HDFS: " + jarFileOnHDFS1); fs.copyFromLocalFile(new Path(jarFile2.getPath()), new Path(fs.getUri().toString() + Path.SEPARATOR)); String jarFileOnHDFS2 = fs.getUri().toString() + Path.SEPARATOR + jarFile2.getName(); + Path pathOnHDFS2 = new Path(jarFileOnHDFS2); assertTrue("Copy jar file to HDFS failed.", - fs.exists(new Path(jarFileOnHDFS2))); + fs.exists(pathOnHDFS2)); LOG.info("Copied jar file to HDFS: " + jarFileOnHDFS2); // create a table that references the coprocessors @@ -238,36 +241,55 @@ public class TestClassLoading { } admin.deleteTable(tableName); } - admin.createTable(htd); + CoprocessorHost.clearCacheForTesting(); + byte[] startKey = {10, 63}; + byte[] endKey = {12, 43}; + admin.createTable(htd, startKey, endKey, 4); waitForTable(htd.getName()); // verify that the coprocessors were loaded - boolean found1 = false, found2 = false, found2_k1 = false, - found2_k2 = false, found2_k3 = false; + boolean foundTableRegion=false; + boolean found1 = true, found2 = true, found2_k1 = true, + found2_k2 = true, found2_k3 = true; + Map> regionsActiveClassLoaders = new HashMap>(); MiniHBaseCluster hbase = TEST_UTIL.getHBaseCluster(); for (HRegion region: hbase.getRegionServer(0).getOnlineRegionsLocalContext()) { if (region.getRegionNameAsString().startsWith(tableName)) { + foundTableRegion = true; CoprocessorEnvironment env; env = region.getCoprocessorHost().findCoprocessorEnvironment(cpName1); - if (env != null) { - found1 = true; - } + found1 = found1 && (env != null); env = region.getCoprocessorHost().findCoprocessorEnvironment(cpName2); + found2 = found2 && (env != null); if (env != null) { - found2 = true; Configuration conf = env.getConfiguration(); - found2_k1 = conf.get("k1") != null; - found2_k2 = conf.get("k2") != null; - found2_k3 = conf.get("k3") != null; + found2_k1 = found2_k1 && (conf.get("k1") != null); + found2_k2 = found2_k2 && (conf.get("k2") != null); + found2_k3 = found2_k3 && (conf.get("k3") != null); + } else { + found2_k1 = found2_k2 = found2_k3 = false; } + + regionsActiveClassLoaders.put(region, region.getCoprocessorHost().activeCoprocessorClassLoaders); } } + + assertTrue("No region has found for table " + tableName, foundTableRegion); assertTrue("Class " + cpName1 + " was missing on a region", found1); assertTrue("Class " + cpName2 + " was missing on a region", found2); assertTrue("Configuration key 'k1' was missing on a region", found2_k1); assertTrue("Configuration key 'k2' was missing on a region", found2_k2); assertTrue("Configuration key 'k3' was missing on a region", found2_k3); + assertTrue(jarFileOnHDFS1 + " should have been loaded once", + CoprocessorHost.getClassloaderCountForTesting(pathOnHDFS1) == 1); + assertTrue(jarFileOnHDFS2 + " should have been loaded once", + CoprocessorHost.getClassloaderCountForTesting(pathOnHDFS2) == 1); + //check if region active classloaders are shared across all RS regions + Set allClassLoaders = new HashSet(CoprocessorHost.classLoadersCache.values()); + for (Map.Entry> regionCP : regionsActiveClassLoaders.entrySet()) { + assertEquals("CP Classloader for region " + regionCP.getKey() + " is not cached", allClassLoaders, regionCP.getValue()); + } } @Test