Index: hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestClassLoading.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestClassLoading.java (revision 1418599) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestClassLoading.java (working copy) @@ -61,6 +61,7 @@ static final String cpName4 = "TestCP4"; static final String cpName5 = "TestCP5"; static final String cpName6 = "TestCP6"; + static final String cpNameInvalid = "TestCPInvalid"; private static Class regionCoprocessor1 = ColumnAggregationEndpoint.class; private static Class regionCoprocessor2 = GenericEndpoint.class; @@ -202,16 +203,18 @@ new Path(fs.getUri().toString() + Path.SEPARATOR)); String jarFileOnHDFS1 = fs.getUri().toString() + Path.SEPARATOR + jarFile1.getName(); + Path pathOnHDFS1 = new Path(jarFileOnHDFS1); assertTrue("Copy jar file to HDFS failed.", - fs.exists(new Path(jarFileOnHDFS1))); + fs.exists(pathOnHDFS1)); LOG.info("Copied jar file to HDFS: " + jarFileOnHDFS1); fs.copyFromLocalFile(new Path(jarFile2.getPath()), new Path(fs.getUri().toString() + Path.SEPARATOR)); String jarFileOnHDFS2 = fs.getUri().toString() + Path.SEPARATOR + jarFile2.getName(); + Path pathOnHDFS2 = new Path(jarFileOnHDFS2); assertTrue("Copy jar file to HDFS failed.", - fs.exists(new Path(jarFileOnHDFS2))); + fs.exists(pathOnHDFS2)); LOG.info("Copied jar file to HDFS: " + jarFileOnHDFS2); // create a table that references the coprocessors @@ -223,6 +226,9 @@ // with configuration values htd.setValue("COPROCESSOR$2", jarFileOnHDFS2.toString() + "|" + cpName2 + "|" + Coprocessor.PRIORITY_USER + "|k1=v1,k2=v2,k3=v3"); + // invalid class name (should fail to load this class) + htd.setValue("COPROCESSOR$3", jarFileOnHDFS2.toString() + "|" + cpNameInvalid + + "|" + Coprocessor.PRIORITY_USER); HBaseAdmin admin = TEST_UTIL.getHBaseAdmin(); if (admin.tableExists(tableName)) { if (admin.isTableEnabled(tableName)) { @@ -230,36 +236,66 @@ } admin.deleteTable(tableName); } - admin.createTable(htd); + CoprocessorHost.classLoadersCache.clear(); + byte[] startKey = {10, 63}; + byte[] endKey = {12, 43}; + admin.createTable(htd, startKey, endKey, 4); waitForTable(htd.getName()); // verify that the coprocessors were loaded - boolean found1 = false, found2 = false, found2_k1 = false, - found2_k2 = false, found2_k3 = false; + boolean foundTableRegion=false; + boolean found_invalid = true, found1 = true, found2 = true, found2_k1 = true, + found2_k2 = true, found2_k3 = true; + Map> regionsActiveClassLoaders = + new HashMap>(); MiniHBaseCluster hbase = TEST_UTIL.getHBaseCluster(); for (HRegion region: hbase.getRegionServer(0).getOnlineRegionsLocalContext()) { if (region.getRegionNameAsString().startsWith(tableName)) { + foundTableRegion = true; CoprocessorEnvironment env; env = region.getCoprocessorHost().findCoprocessorEnvironment(cpName1); - if (env != null) { - found1 = true; - } + found1 = found1 && (env != null); env = region.getCoprocessorHost().findCoprocessorEnvironment(cpName2); + found2 = found2 && (env != null); if (env != null) { - found2 = true; Configuration conf = env.getConfiguration(); - found2_k1 = conf.get("k1") != null; - found2_k2 = conf.get("k2") != null; - found2_k3 = conf.get("k3") != null; + found2_k1 = found2_k1 && (conf.get("k1") != null); + found2_k2 = found2_k2 && (conf.get("k2") != null); + found2_k3 = found2_k3 && (conf.get("k3") != null); + } else { + found2_k1 = found2_k2 = found2_k3 = false; } + env = region.getCoprocessorHost().findCoprocessorEnvironment(cpNameInvalid); + found_invalid = found_invalid && (env != null); + + regionsActiveClassLoaders.put(region, + region.getCoprocessorHost().activeCoprocessorClassLoaders); } } + + assertTrue("No region was found for table " + tableName, foundTableRegion); assertTrue("Class " + cpName1 + " was missing on a region", found1); assertTrue("Class " + cpName2 + " was missing on a region", found2); + //an invalid CP class name is defined for this table, validate that it is not loaded + assertFalse("Class " + cpNameInvalid + " was found on a region", found_invalid); assertTrue("Configuration key 'k1' was missing on a region", found2_k1); assertTrue("Configuration key 'k2' was missing on a region", found2_k2); assertTrue("Configuration key 'k3' was missing on a region", found2_k3); + // check if CP classloaders are cached + assertTrue(jarFileOnHDFS1 + " was not cached", + CoprocessorHost.classLoadersCache.containsKey(pathOnHDFS1)); + assertTrue(jarFileOnHDFS2 + " was not cached", + CoprocessorHost.classLoadersCache.containsKey(pathOnHDFS2)); + //check if region active classloaders are shared across all RS regions + Set allClassLoaders = new HashSet( + CoprocessorHost.classLoadersCache.values()); + for (Map.Entry> regionCP : regionsActiveClassLoaders.entrySet()) { + assertTrue("Some CP classloaders for region " + regionCP.getKey() + " are not cached." + + " ClassLoader Cache:" + allClassLoaders + + " Region ClassLoaders:" + regionCP.getValue(), + allClassLoaders.containsAll(regionCP.getValue())); + } } @Test Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (revision 1418599) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (working copy) @@ -342,6 +342,7 @@ } shutdown(env); } + activeCoprocessorClassLoaders.clear(); } /** Index: hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java (revision 1418599) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java (working copy) @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.coprocessor; +import com.google.common.collect.MapMaker; import com.google.protobuf.Service; import com.google.protobuf.ServiceException; import org.apache.commons.logging.Log; @@ -48,6 +49,7 @@ import java.io.IOException; import java.net.URL; import java.util.*; +import java.util.concurrent.ConcurrentMap; import java.util.jar.JarEntry; import java.util.jar.JarFile; @@ -78,6 +80,11 @@ protected String pathPrefix; protected volatile int loadSequence; + // classloader caching + protected Set activeCoprocessorClassLoaders = new HashSet(); + static ConcurrentMap classLoadersCache = new MapMaker() + .concurrencyLevel(3).weakValues().makeMap(); + public CoprocessorHost() { pathPrefix = UUID.randomUUID().toString(); } @@ -165,14 +172,27 @@ LOG.debug("Loading coprocessor class " + className + " with path " + path + " and priority " + priority); - // Have we already loaded the class, perhaps from an earlier region open - // for the same table? - try { - implClass = getClass().getClassLoader().loadClass(className); - } catch (ClassNotFoundException e) { - LOG.info("Class " + className + " needs to be loaded from a file - " + - path + "."); - // go ahead to load from file system. + ClassLoader cl = null; + if (path == null) { + try { + implClass = getClass().getClassLoader().loadClass(className); + } catch (ClassNotFoundException e) { + throw new IOException("No jar path specified for " + className); + } + } else { + // Have we already loaded the class, perhaps from an earlier region open + // for the same table? + cl = classLoadersCache.get(path); + if (cl != null){ + LOG.debug("Found classloader "+ cl + "for "+path.toString()); + try { + implClass = cl.loadClass(className); + } catch (ClassNotFoundException e) { + LOG.info("Class " + className + " needs to be loaded from a file - " + + path + "."); + // go ahead to load from file system. + } + } } // If not, load @@ -204,7 +224,8 @@ // unsurprisingly wants URLs, not URIs; so we will use the deprecated // method which returns URLs for as long as it is available List paths = new ArrayList(); - paths.add(new File(dst.toString()).getCanonicalFile().toURL()); + URL url = new File(dst.toString()).getCanonicalFile().toURL(); + paths.add(url); JarFile jarFile = new JarFile(dst.toString()); Enumeration entries = jarFile.entries(); @@ -221,17 +242,36 @@ } jarFile.close(); - ClassLoader cl = new CoprocessorClassLoader(paths, - this.getClass().getClassLoader()); - Thread.currentThread().setContextClassLoader(cl); + cl = new CoprocessorClassLoader(paths, this.getClass().getClassLoader()); + try { implClass = cl.loadClass(className); + // cache cp classloader as a weak value, will be GC'ed when no reference left + ClassLoader prev = classLoadersCache.putIfAbsent(path, cl); + if (prev != null) { + //lost update race, use already added classloader + cl = prev; + } } catch (ClassNotFoundException e) { - throw new IOException(e); + throw new IOException("Cannot load external coprocessor class " + className, e); } } - return loadInstance(implClass, priority, conf); + //load custom code for coprocessor + Thread currentThread = Thread.currentThread(); + ClassLoader hostClassLoader = currentThread.getContextClassLoader(); + try{ + // switch temporarily to the thread classloader for custom CP + currentThread.setContextClassLoader(cl); + E cpInstance = loadInstance(implClass, priority, conf); + // keep a reference to cached classloader at this instance level + // when no CPH refer the weak value of classloader it will be GC'ed + activeCoprocessorClassLoaders.add(cl); + return cpInstance; + } finally { + // restore the fresh (host) classloader + currentThread.setContextClassLoader(hostClassLoader); + } } /** Index: hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorClassLoader.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorClassLoader.java (revision 1418599) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorClassLoader.java (working copy) @@ -65,9 +65,13 @@ "org.w3c", "org.xml", "sunw.", - // Hadoop/HBase: + // logging + "org.apache.commons.logging", + "org.apache.log4j", + "com.hadoop", + // Hadoop/HBase/ZK: "org.apache.hadoop", - "com.hadoop", + "org.apache.zookeeper", }; /** @@ -80,16 +84,25 @@ new Pattern[] { Pattern.compile("^[^-]+-default\\.xml$") }; - + /** + * Parent classloader used to load any class not matching the exemption list. + */ + private ClassLoader parent; + + /** * Creates a CoprocessorClassLoader that loads classes from the given paths. * @param paths paths from which to load classes. * @param parent the parent ClassLoader to set. */ public CoprocessorClassLoader(List paths, ClassLoader parent) { super(paths.toArray(new URL[]{}), parent); + this.parent = parent; + if (parent == null) { + throw new IllegalArgumentException("No parent classloader!"); + } } - + @Override synchronized public Class loadClass(String name) throws ClassNotFoundException { @@ -99,9 +112,9 @@ LOG.debug("Skipping exempt class " + name + " - delegating directly to parent"); } - return super.loadClass(name); + return parent.loadClass(name); } - + // Check whether the class has already been loaded: Class clasz = findLoadedClass(name); if (clasz != null) { @@ -123,7 +136,7 @@ LOG.debug("Class " + name + " not found - delegating to parent"); } try { - clasz = super.loadClass(name); + clasz = parent.loadClass(name); } catch (ClassNotFoundException e2) { // Class not found in this ClassLoader or in the parent ClassLoader // Log some debug output before rethrowing ClassNotFoundException