Index: hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java (revision 1369426) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java (working copy) @@ -25,13 +25,18 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; +import org.apache.commons.collections.map.AbstractReferenceMap; +import org.apache.commons.collections.map.ReferenceMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseTestCase; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; @@ -41,6 +46,8 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; @@ -49,6 +56,7 @@ import org.apache.hadoop.hbase.regionserver.SplitTransaction; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.PairOfSameType; import org.junit.experimental.categories.Category; @@ -132,14 +140,19 @@ private boolean postFlushCalled; private boolean preSplitCalled; private boolean postSplitCalled; + private ConcurrentMap sharedData; @Override public void start(CoprocessorEnvironment e) { + sharedData = ((RegionCoprocessorEnvironment)e).getSharedData(); + // using new String here, so that there will be new object on each invocation + sharedData.putIfAbsent("test1", new Object()); startCalled = true; } @Override public void stop(CoprocessorEnvironment e) { + sharedData = null; stopCalled = true; } @@ -214,15 +227,113 @@ boolean wasSplit() { return (preSplitCalled && postSplitCalled); } + Map getSharedData() { + return sharedData; + } } + public static class CoprocessorII extends BaseRegionObserver { + private ConcurrentMap sharedData; + @Override + public void start(CoprocessorEnvironment e) { + sharedData = ((RegionCoprocessorEnvironment)e).getSharedData(); + sharedData.putIfAbsent("test2", new Object()); + } + @Override + public void stop(CoprocessorEnvironment e) { + sharedData = null; + } + @Override + public void preGet(final ObserverContext e, + final Get get, final List results) throws IOException { + if (1/0 == 1) { + e.complete(); + } + } + + Map getSharedData() { + return sharedData; + } + } + + public void testSharedData() throws IOException { + byte [] tableName = Bytes.toBytes("testtable"); + byte [][] families = { fam1, fam2, fam3 }; + + Configuration hc = initSplit(); + HRegion region = initHRegion(tableName, getName(), hc, + new Class[]{}, families); + + for (int i = 0; i < 3; i++) { + addContent(region, fam3); + region.flushcache(); + } + + region.compactStores(); + + byte [] splitRow = region.checkSplit(); + + assertNotNull(splitRow); + HRegion [] regions = split(region, splitRow); + for (int i = 0; i < regions.length; i++) { + regions[i] = reopenRegion(regions[i], CoprocessorImpl.class, CoprocessorII.class); + } + Coprocessor c = regions[0].getCoprocessorHost(). + findCoprocessor(CoprocessorImpl.class.getName()); + Coprocessor c2 = regions[0].getCoprocessorHost(). + findCoprocessor(CoprocessorII.class.getName()); + Object o = ((CoprocessorImpl)c).getSharedData().get("test1"); + Object o2 = ((CoprocessorII)c2).getSharedData().get("test2"); + assertNotNull(o); + assertNotNull(o2); + // to coprocessors get different sharedDatas + assertFalse(((CoprocessorImpl)c).getSharedData() == ((CoprocessorII)c2).getSharedData()); + for (int i = 1; i < regions.length; i++) { + c = regions[i].getCoprocessorHost(). + findCoprocessor(CoprocessorImpl.class.getName()); + c2 = regions[i].getCoprocessorHost(). + findCoprocessor(CoprocessorII.class.getName()); + // make sure that all coprocessor of a class have identical sharedDatas + assertTrue(((CoprocessorImpl)c).getSharedData().get("test1") == o); + assertTrue(((CoprocessorII)c2).getSharedData().get("test2") == o2); + } + // now have all Environments fail + for (int i = 0; i < regions.length; i++) { + try { + Get g = new Get(regions[i].getStartKey()); + regions[i].get(g, null); + fail(); + } catch (DoNotRetryIOException xc) { + } + assertNull(regions[i].getCoprocessorHost(). + findCoprocessor(CoprocessorII.class.getName())); + } + c = regions[0].getCoprocessorHost(). + findCoprocessor(CoprocessorImpl.class.getName()); + assertTrue(((CoprocessorImpl)c).getSharedData().get("test1") == o); + c = c2 = null; + // perform a GC + System.gc(); + // reopen the region + region = reopenRegion(regions[0], CoprocessorImpl.class, CoprocessorII.class); + c = region.getCoprocessorHost(). + findCoprocessor(CoprocessorImpl.class.getName()); + // CPimpl is unaffected, still the same reference + assertTrue(((CoprocessorImpl)c).getSharedData().get("test1") == o); + c2 = region.getCoprocessorHost(). + findCoprocessor(CoprocessorII.class.getName()); + // new map and object created, hence the reference is different + // hence the old entry was indeed removed by the GC and new one has been created + assertFalse(((CoprocessorII)c2).getSharedData().get("test2") == o2); + } + public void testCoprocessorInterface() throws IOException { byte [] tableName = Bytes.toBytes("testtable"); byte [][] families = { fam1, fam2, fam3 }; Configuration hc = initSplit(); HRegion region = initHRegion(tableName, getName(), hc, - CoprocessorImpl.class, families); + new Class[]{CoprocessorImpl.class}, families); for (int i = 0; i < 3; i++) { addContent(region, fam3); region.flushcache(); @@ -268,7 +379,7 @@ } } - HRegion reopenRegion(final HRegion closedRegion, Class implClass) + HRegion reopenRegion(final HRegion closedRegion, Class ... implClasses) throws IOException { //HRegionInfo info = new HRegionInfo(tableName, null, null, false); HRegion r = new HRegion(closedRegion); @@ -281,7 +392,9 @@ RegionCoprocessorHost host = new RegionCoprocessorHost(r, null, conf); r.setCoprocessorHost(host); - host.load(implClass, Coprocessor.PRIORITY_USER, conf); + for (Class implClass : implClasses) { + host.load(implClass, Coprocessor.PRIORITY_USER, conf); + } // we need to manually call pre- and postOpen here since the // above load() is not the real case for CP loading. A CP is // expected to be loaded by default from 1) configuration; or 2) @@ -294,7 +407,7 @@ } HRegion initHRegion (byte [] tableName, String callingMethod, - Configuration conf, Class implClass, byte [] ... families) + Configuration conf, Class [] implClasses, byte [][] families) throws IOException { HTableDescriptor htd = new HTableDescriptor(tableName); for(byte [] family : families) { @@ -308,11 +421,12 @@ RegionCoprocessorHost host = new RegionCoprocessorHost(r, null, conf); r.setCoprocessorHost(host); - host.load(implClass, Coprocessor.PRIORITY_USER, conf); + for (Class implClass : implClasses) { + host.load(implClass, Coprocessor.PRIORITY_USER, conf); + Coprocessor c = host.findCoprocessor(implClass.getName()); + assertNotNull(c); + } - Coprocessor c = host.findCoprocessor(implClass.getName()); - assertNotNull(c); - // Here we have to call pre and postOpen explicitly. host.preOpen(); host.postOpen(); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (revision 1369426) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (working copy) @@ -20,21 +20,39 @@ package org.apache.hadoop.hbase.regionserver; -import com.google.common.collect.ImmutableList; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.regex.Matcher; + +import org.apache.commons.collections.map.AbstractReferenceMap; +import org.apache.commons.collections.map.ReferenceMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Coprocessor; +import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.Coprocessor; -import org.apache.hadoop.hbase.CoprocessorEnvironment; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.client.*; -import org.apache.hadoop.hbase.coprocessor.*; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.WritableByteArrayComparable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; @@ -45,9 +63,7 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.util.StringUtils; -import java.io.IOException; -import java.util.*; -import java.util.regex.Matcher; +import com.google.common.collect.ImmutableList; /** * Implements the coprocessor environment and runtime support for coprocessors @@ -57,6 +73,9 @@ extends CoprocessorHost { private static final Log LOG = LogFactory.getLog(RegionCoprocessorHost.class); + // The shared data map + private static ReferenceMap sharedDataMap = + new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK); /** * Encapsulation of the environment of each coprocessor @@ -66,6 +85,7 @@ private HRegion region; private RegionServerServices rsServices; + ConcurrentMap sharedData; /** * Constructor @@ -74,10 +94,11 @@ */ public RegionEnvironment(final Coprocessor impl, final int priority, final int seq, final Configuration conf, final HRegion region, - final RegionServerServices services) { + final RegionServerServices services, final ConcurrentMap sharedData) { super(impl, priority, seq, conf); this.region = region; this.rsServices = services; + this.sharedData = sharedData; } /** @return the region */ @@ -95,6 +116,11 @@ public void shutdown() { super.shutdown(); } + + @Override + public ConcurrentMap getSharedData() { + return sharedData; + } } /** The region server services */ @@ -194,8 +220,19 @@ break; } } + ConcurrentMap classData; + // make sure only one thread can add maps + synchronized (sharedDataMap) { + // as long as at least one RegionEnvironment holds on to its classData it will + // remain in this map + classData = (ConcurrentMap)sharedDataMap.get(implClass.getName()); + if (classData == null) { + classData = new ConcurrentHashMap(); + sharedDataMap.put(implClass.getName(), classData); + } + } return new RegionEnvironment(instance, priority, seq, conf, region, - rsServices); + rsServices, classData); } @Override Index: hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java (revision 1369426) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java (working copy) @@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.coprocessor; +import java.util.concurrent.ConcurrentMap; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hbase.CoprocessorEnvironment; @@ -35,4 +37,7 @@ /** @return reference to the region server services */ public RegionServerServices getRegionServerServices(); + /** @return shared data between all instances of this coprocessor */ + public ConcurrentMap getSharedData(); + } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java (revision 1369426) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java (working copy) @@ -279,6 +279,11 @@ public abstract E createEnvironment(Class implClass, Coprocessor instance, int priority, int sequence, Configuration conf); + + protected boolean removeEnvironment(final CoprocessorEnvironment env) { + return coprocessors.remove(env); + } + public void shutdown(CoprocessorEnvironment e) { if (e instanceof Environment) { ((Environment)e).shutdown(); @@ -708,7 +713,7 @@ } else { LOG.error("Removing coprocessor '" + env.toString() + "' from " + "environment because it threw: " + e,e); - coprocessors.remove(env); + removeEnvironment(env); throw new DoNotRetryIOException("Coprocessor: '" + env.toString() + "' threw: '" + e + "' and has been removed" + "from the active " + "coprocessor set.", e);