Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (revision 1369170) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (working copy) @@ -20,21 +20,37 @@ package org.apache.hadoop.hbase.regionserver; -import com.google.common.collect.ImmutableList; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.regex.Matcher; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Coprocessor; +import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.Coprocessor; -import org.apache.hadoop.hbase.CoprocessorEnvironment; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.client.*; -import org.apache.hadoop.hbase.coprocessor.*; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.WritableByteArrayComparable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; @@ -45,9 +61,7 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.util.StringUtils; -import java.io.IOException; -import java.util.*; -import java.util.regex.Matcher; +import com.google.common.collect.ImmutableList; /** * Implements the coprocessor environment and runtime support for coprocessors @@ -57,6 +71,8 @@ extends CoprocessorHost { private static final Log LOG = LogFactory.getLog(RegionCoprocessorHost.class); + private ConcurrentMap, ConcurrentMap> sharedDataMap = + new ConcurrentHashMap, ConcurrentMap>(); /** * Encapsulation of the environment of each coprocessor @@ -66,6 +82,7 @@ private HRegion region; private RegionServerServices rsServices; + private ConcurrentMap sharedData; /** * Constructor @@ -74,10 +91,11 @@ */ public RegionEnvironment(final Coprocessor impl, final int priority, final int seq, final Configuration conf, final HRegion region, - final RegionServerServices services) { + final RegionServerServices services, final ConcurrentMap sharedData) { super(impl, priority, seq, conf); this.region = region; this.rsServices = services; + this.sharedData = sharedData; } /** @return the region */ @@ -95,6 +113,11 @@ public void shutdown() { super.shutdown(); } + + @Override + public ConcurrentMap getSharedData() { + return sharedData; + } } /** The region server services */ @@ -126,6 +149,12 @@ loadTableCoprocessors(conf); } + protected boolean removeEnvironment(CoprocessorEnvironment env) { + // cleanup the sharedDataMap + sharedDataMap.remove(env.getInstance().getClass()); + return super.removeEnvironment(env); + } + void loadTableCoprocessors(final Configuration conf) { // scan the table attributes for coprocessor load specifications // initialize the coprocessors @@ -194,8 +223,15 @@ break; } } + ConcurrentMap classData = sharedDataMap.get(implClass); + if (classData == null) { + classData = new ConcurrentHashMap(); + ConcurrentMap tmp = sharedDataMap.putIfAbsent(implClass, classData); + if (tmp != null) + classData = tmp; + } return new RegionEnvironment(instance, priority, seq, conf, region, - rsServices); + rsServices, classData); } @Override Index: hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java (revision 1369170) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java (working copy) @@ -81,6 +81,10 @@ pathPrefix = UUID.randomUUID().toString(); } + protected boolean removeEnvironment(CoprocessorEnvironment env) { + return coprocessors.remove(env); + } + /** * Not to be confused with the per-object _coprocessors_ (above), * coprocessorNames is static and stores the set of all coprocessors ever @@ -708,7 +712,7 @@ } else { LOG.error("Removing coprocessor '" + env.toString() + "' from " + "environment because it threw: " + e,e); - coprocessors.remove(env); + removeEnvironment(env); throw new DoNotRetryIOException("Coprocessor: '" + env.toString() + "' threw: '" + e + "' and has been removed" + "from the active " + "coprocessor set.", e); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java (revision 1369170) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java (working copy) @@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.coprocessor; +import java.util.concurrent.ConcurrentMap; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hbase.CoprocessorEnvironment; @@ -35,4 +37,7 @@ /** @return reference to the region server services */ public RegionServerServices getRegionServerServices(); + /** @return shared data between all instances of this coprocessor */ + public ConcurrentMap getSharedData(); + }