diff --git src/main/jamon/org/apache/hbase/tmpl/master/MasterStatusTmpl.jamon src/main/jamon/org/apache/hbase/tmpl/master/MasterStatusTmpl.jamon
index abeb850..43832c8 100644
--- src/main/jamon/org/apache/hbase/tmpl/master/MasterStatusTmpl.jamon
+++ src/main/jamon/org/apache/hbase/tmpl/master/MasterStatusTmpl.jamon
@@ -99,6 +99,12 @@ org.apache.hadoop.hbase.HTableDescriptor;
| Fragmentation | <% frags.get("-TOTAL-") != null ? frags.get("-TOTAL-").intValue() + "%" : "n/a" %> | Overall fragmentation of all tables, including .META. and -ROOT-. |
%if>
| Zookeeper Quorum | <% master.getZooKeeperWatcher().getQuorum() %> | Addresses of all registered ZK servers. For more, see zk dump. |
+
+ |
+ Coprocessors | <% java.util.Arrays.toString(master.getCoprocessors()) %>
+ |
+ Coprocessors currently loaded loaded by the master |
+
<& ../common/TaskMonitorTmpl; filter = filter &>
diff --git src/main/jamon/org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon src/main/jamon/org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon
index be6fceb..1d7860f 100644
--- src/main/jamon/org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon
+++ src/main/jamon/org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon
@@ -77,6 +77,13 @@ org.apache.hadoop.hbase.HRegionInfo;
| HBase Compiled | <% org.apache.hadoop.hbase.util.VersionInfo.getDate() %>, <% org.apache.hadoop.hbase.util.VersionInfo.getUser() %> | When HBase version was compiled and by whom |
| Metrics | <% metrics.toString() %> | RegionServer Metrics; file and heap sizes are in megabytes |
| Zookeeper Quorum | <% regionServer.getZooKeeper().getQuorum() %> | Addresses of all registered ZK servers |
+
+ | Coprocessors |
+
+ <% java.util.Arrays.toString(regionServer.getCoprocessors()) %>
+ |
+ Coprocessors currently loaded by this regionserver |
+
<& ../common/TaskMonitorTmpl; filter = filter &>
diff --git src/main/java/org/apache/hadoop/hbase/ClusterStatus.java src/main/java/org/apache/hadoop/hbase/ClusterStatus.java
index 01bc1dd..f64a54d 100644
--- src/main/java/org/apache/hadoop/hbase/ClusterStatus.java
+++ src/main/java/org/apache/hadoop/hbase/ClusterStatus.java
@@ -24,6 +24,7 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -66,6 +67,7 @@ public class ClusterStatus extends VersionedWritable {
private Collection deadServers;
private Map intransition;
private String clusterId;
+ private String[] masterCoprocessors;
/**
* Constructor, for Writable
@@ -76,12 +78,14 @@ public class ClusterStatus extends VersionedWritable {
public ClusterStatus(final String hbaseVersion, final String clusterid,
final Map servers,
- final Collection deadServers, final Map rit) {
+ final Collection deadServers, final Map rit,
+ final String[] masterCoprocessors) {
this.hbaseVersion = hbaseVersion;
this.liveServers = servers;
this.deadServers = deadServers;
this.intransition = rit;
this.clusterId = clusterid;
+ this.masterCoprocessors = masterCoprocessors;
}
/**
@@ -155,7 +159,8 @@ public class ClusterStatus extends VersionedWritable {
return (getVersion() == ((ClusterStatus)o).getVersion()) &&
getHBaseVersion().equals(((ClusterStatus)o).getHBaseVersion()) &&
this.liveServers.equals(((ClusterStatus)o).liveServers) &&
- deadServers.equals(((ClusterStatus)o).deadServers);
+ deadServers.equals(((ClusterStatus)o).deadServers) &&
+ Arrays.equals(this.masterCoprocessors, ((ClusterStatus)o).masterCoprocessors);
}
/**
@@ -205,6 +210,10 @@ public class ClusterStatus extends VersionedWritable {
return clusterId;
}
+ public String[] getMasterCoprocessors() {
+ return masterCoprocessors;
+ }
+
//
// Writable
//
@@ -227,6 +236,10 @@ public class ClusterStatus extends VersionedWritable {
e.getValue().write(out);
}
out.writeUTF(clusterId);
+ out.writeInt(masterCoprocessors.length);
+ for(String masterCoprocessor: masterCoprocessors) {
+ out.writeUTF(masterCoprocessor);
+ }
}
public void readFields(DataInput in) throws IOException {
@@ -254,5 +267,10 @@ public class ClusterStatus extends VersionedWritable {
this.intransition.put(key, regionState);
}
this.clusterId = in.readUTF();
+ int masterCoprocessorsLength = in.readInt();
+ masterCoprocessors = new String[masterCoprocessorsLength];
+ for(int i = 0; i < masterCoprocessorsLength; i++) {
+ masterCoprocessors[i] = in.readUTF();
+ }
}
-}
\ No newline at end of file
+}
diff --git src/main/java/org/apache/hadoop/hbase/HServerLoad.java src/main/java/org/apache/hadoop/hbase/HServerLoad.java
index 0c680e4..ee1c6c8 100644
--- src/main/java/org/apache/hadoop/hbase/HServerLoad.java
+++ src/main/java/org/apache/hadoop/hbase/HServerLoad.java
@@ -22,9 +22,12 @@ package org.apache.hadoop.hbase;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
+import java.util.Set;
import java.util.TreeMap;
+import java.util.TreeSet;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Strings;
@@ -55,6 +58,33 @@ implements WritableComparable {
/** the maximum allowable size of the heap, in MB */
private int maxHeapMB = 0;
+ // Regionserver-level coprocessors, e.g., WALObserver implementations.
+ // Region-level coprocessors, on the other hand, are stored inside RegionLoad
+ // objects.
+ private Set coprocessors =
+ new TreeSet();
+
+ /**
+ * HBASE-4070: Improve region server metrics to report loaded coprocessors.
+ *
+ * @return Returns the set of all coprocessors on this
+ * regionserver, where this set is the union of the
+ * regionserver-level coprocessors on one hand, and all of the region-level
+ * coprocessors, on the other.
+ *
+ * We must iterate through all regions loaded on this regionserver to
+ * obtain all of the region-level coprocessors.
+ */
+ public String[] getCoprocessors() {
+ TreeSet returnValue = new TreeSet(coprocessors);
+ for (Map.Entry rls: getRegionsLoad().entrySet()) {
+ for (String coprocessor: rls.getValue().getCoprocessors()) {
+ returnValue.add(coprocessor);
+ }
+ }
+ return returnValue.toArray(new String[0]);
+ }
+
/** per-region load metrics */
private Map regionLoad =
new TreeMap(Bytes.BYTES_COMPARATOR);
@@ -114,6 +144,10 @@ implements WritableComparable {
*/
private int totalStaticBloomSizeKB;
+ // Region-level coprocessors.
+ Set coprocessors =
+ new TreeSet();
+
/**
* Constructor, for Writable
*/
@@ -133,6 +167,7 @@ implements WritableComparable {
* @param writeRequestsCount
* @param totalCompactingKVs
* @param currentCompactedKVs
+ * @param coprocessors
*/
public RegionLoad(final byte[] name, final int stores,
final int storefiles, final int storeUncompressedSizeMB,
@@ -141,7 +176,8 @@ implements WritableComparable {
final int rootIndexSizeKB, final int totalStaticIndexSizeKB,
final int totalStaticBloomSizeKB,
final int readRequestsCount, final int writeRequestsCount,
- final long totalCompactingKVs, final long currentCompactedKVs) {
+ final long totalCompactingKVs, final long currentCompactedKVs,
+ final Set coprocessors) {
this.name = name;
this.stores = stores;
this.storefiles = storefiles;
@@ -156,9 +192,13 @@ implements WritableComparable {
this.writeRequestsCount = writeRequestsCount;
this.totalCompactingKVs = totalCompactingKVs;
this.currentCompactedKVs = currentCompactedKVs;
+ this.coprocessors = coprocessors;
}
// Getters
+ private String[] getCoprocessors() {
+ return coprocessors.toArray(new String[0]);
+ }
/**
* @return the region name
@@ -332,6 +372,11 @@ implements WritableComparable {
this.totalStaticBloomSizeKB = in.readInt();
this.totalCompactingKVs = in.readLong();
this.currentCompactedKVs = in.readLong();
+ int coprocessorsSize = in.readInt();
+ coprocessors = new TreeSet();
+ for (int i = 0; i < coprocessorsSize; i++) {
+ coprocessors.add(in.readUTF());
+ }
}
public void write(DataOutput out) throws IOException {
@@ -352,6 +397,10 @@ implements WritableComparable {
out.writeInt(totalStaticBloomSizeKB);
out.writeLong(totalCompactingKVs);
out.writeLong(currentCompactedKVs);
+ out.writeInt(coprocessors.size());
+ for (String coprocessor: coprocessors) {
+ out.writeUTF(coprocessor);
+ }
}
/**
@@ -397,6 +446,11 @@ implements WritableComparable {
}
sb = Strings.appendKeyValue(sb, "compactionProgressPct",
compactionProgressPct);
+ String coprocessors = Arrays.toString(getCoprocessors());
+ if (coprocessors != null) {
+ sb = Strings.appendKeyValue(sb, "coprocessors",
+ Arrays.toString(getCoprocessors()));
+ }
return sb.toString();
}
}
@@ -424,15 +478,18 @@ implements WritableComparable {
* @param numberOfRequests
* @param usedHeapMB
* @param maxHeapMB
+ * @param coprocessors : coprocessors loaded at the regionserver-level
*/
public HServerLoad(final int totalNumberOfRequests,
final int numberOfRequests, final int usedHeapMB, final int maxHeapMB,
- final Map regionLoad) {
+ final Map regionLoad,
+ final Set coprocessors) {
this.numberOfRequests = numberOfRequests;
this.usedHeapMB = usedHeapMB;
this.maxHeapMB = maxHeapMB;
this.regionLoad = regionLoad;
this.totalNumberOfRequests = totalNumberOfRequests;
+ this.coprocessors = coprocessors;
}
/**
@@ -441,7 +498,7 @@ implements WritableComparable {
*/
public HServerLoad(final HServerLoad hsl) {
this(hsl.totalNumberOfRequests, hsl.numberOfRequests, hsl.usedHeapMB,
- hsl.maxHeapMB, hsl.getRegionsLoad());
+ hsl.maxHeapMB, hsl.getRegionsLoad(), hsl.coprocessors);
for (Map.Entry e : hsl.regionLoad.entrySet()) {
this.regionLoad.put(e.getKey(), e.getValue());
}
@@ -487,6 +544,10 @@ implements WritableComparable {
sb = Strings.appendKeyValue(sb, "usedHeapMB",
Integer.valueOf(this.usedHeapMB));
sb = Strings.appendKeyValue(sb, "maxHeapMB", Integer.valueOf(maxHeapMB));
+ String coprocessors = Arrays.toString(getCoprocessors());
+ if (coprocessors != null) {
+ sb = Strings.appendKeyValue(sb, "coprocessors", coprocessors);
+ }
return sb.toString();
}
@@ -607,6 +668,10 @@ implements WritableComparable {
regionLoad.put(rl.getName(), rl);
}
totalNumberOfRequests = in.readInt();
+ int coprocessorsSize = in.readInt();
+ for(int i = 0; i < coprocessorsSize; i++) {
+ coprocessors.add(in.readUTF());
+ }
}
public void write(DataOutput out) throws IOException {
@@ -619,6 +684,10 @@ implements WritableComparable {
for (RegionLoad rl: regionLoad.values())
rl.write(out);
out.writeInt(totalNumberOfRequests);
+ out.writeInt(coprocessors.size());
+ for (String coprocessor: coprocessors) {
+ out.writeUTF(coprocessor);
+ }
}
// Comparable
diff --git src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
index 92c959c..e10d7e9 100644
--- src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
+++ src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
@@ -1647,4 +1647,13 @@ public class HBaseAdmin implements Abortable, Closeable {
sn.getHostname(), sn.getPort());
return rs.rollHLogWriter();
}
+
+ public String[] getMasterCoprocessors() {
+ try {
+ return getClusterStatus().getMasterCoprocessors();
+ } catch (IOException e) {
+ LOG.error("Could not getClusterStatus()",e);
+ return null;
+ }
+ }
}
diff --git src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
index 7d2f82e..d5d555c 100644
--- src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
+++ src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
@@ -73,6 +73,14 @@ public abstract class CoprocessorHost {
pathPrefix = UUID.randomUUID().toString();
}
+ /**
+ * Not to be confused with the per-object _coprocessors_ (above),
+ * coprocessorNames is static and stores the set of all coprocessors ever
+ * loaded by any thread in this JVM. It is strictly additive: coprocessors are
+ * added to coprocessorNames, by loadInstance() but are never removed, since
+ * the intention is to preserve a history of all loaded coprocessors for
+ * diagnosis in case of server crash (HBASE-4014).
+ */
private static Set coprocessorNames =
Collections.synchronizedSet(new HashSet());
public static Set getLoadedCoprocessors() {
@@ -80,6 +88,21 @@ public abstract class CoprocessorHost {
}
/**
+ * Used to create a parameter to the HServerLoad constructor so that
+ * HServerLoad can provide information about the coprocessors loaded by this
+ * regionserver.
+ * (HBASE-4070: Improve region server metrics to report loaded coprocessors
+ * to master).
+ */
+ public Set getCoprocessors() {
+ Set returnValue = new TreeSet();
+ for(CoprocessorEnvironment e: coprocessors) {
+ returnValue.add(e.getInstance().getClass().getSimpleName());
+ }
+ return returnValue;
+ }
+
+ /**
* Load system coprocessors. Read the class names from configuration.
* Called by constructor.
*/
diff --git src/main/java/org/apache/hadoop/hbase/master/HMaster.java src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 50b49a6..7c1c24b 100644
--- src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -28,6 +28,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -345,7 +346,7 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
}
/**
- * Initilize all ZK based system trackers.
+ * Initialize all ZK based system trackers.
* @throws IOException
* @throws InterruptedException
*/
@@ -1137,7 +1138,8 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
this.fileSystemManager.getClusterId(),
this.serverManager.getOnlineServers(),
this.serverManager.getDeadServers(),
- this.assignmentManager.getRegionsInTransition());
+ this.assignmentManager.getRegionsInTransition(),
+ this.getCoprocessors());
}
public String getClusterId() {
@@ -1155,6 +1157,15 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
return CoprocessorHost.getLoadedCoprocessors().toString();
}
+ /**
+ * @return array of coprocessor SimpleNames.
+ */
+ public String[] getCoprocessors() {
+ Set masterCoprocessors =
+ getCoprocessorHost().getCoprocessors();
+ return masterCoprocessors.toArray(new String[0]);
+ }
+
@Override
public void abort(final String msg, final Throwable t) {
if (cpHost != null) {
diff --git src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index e2e694a..effcb6e 100644
--- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -795,7 +795,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
return new HServerLoad(requestCount.get(),(int)metrics.getRequests(),
(int)(memory.getUsed() / 1024 / 1024),
- (int) (memory.getMax() / 1024 / 1024), regionLoads);
+ (int) (memory.getMax() / 1024 / 1024), regionLoads,
+ this.hlog.getCoprocessorHost().getCoprocessors());
}
String getOnlineRegionsAsPrintableString() {
@@ -994,7 +995,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
storefileSizeMB, memstoreSizeMB, storefileIndexSizeMB, rootIndexSizeKB,
totalStaticIndexSizeKB, totalStaticBloomSizeKB,
(int) r.readRequestsCount.get(), (int) r.writeRequestsCount.get(),
- totalCompactingKVs, currentCompactedKVs);
+ totalCompactingKVs, currentCompactedKVs,
+ r.getCoprocessorHost().getCoprocessors());
}
/**
@@ -3277,4 +3279,10 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
HLog wal = this.getWAL();
return wal.rollWriter(true);
}
+
+ // used by org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon (HBASE-4070).
+ public String[] getCoprocessors() {
+ HServerLoad hsl = buildServerLoad();
+ return hsl == null? null: hsl.getCoprocessors();
+ }
}
diff --git src/test/java/org/apache/hadoop/hbase/coprocessor/TestClassLoading.java src/test/java/org/apache/hadoop/hbase/coprocessor/TestClassLoading.java
index eda5a9b..cdbba7e 100644
--- src/test/java/org/apache/hadoop/hbase/coprocessor/TestClassLoading.java
+++ src/test/java/org/apache/hadoop/hbase/coprocessor/TestClassLoading.java
@@ -25,12 +25,15 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HServerLoad;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableNotEnabledException;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -38,10 +41,12 @@ import org.apache.hadoop.fs.Path;
import javax.tools.*;
import java.io.*;
import java.util.*;
+import java.util.Arrays;
import java.util.jar.*;
import org.junit.*;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
@@ -63,10 +68,44 @@ public class TestClassLoading {
static final String cpName4 = "TestCP4";
static final String cpName5 = "TestCP5";
+ private static Class regionCoprocessor1 = ColumnAggregationEndpoint.class;
+ private static Class regionCoprocessor2 = GenericEndpoint.class;
+ private static Class regionServerCoprocessor = SampleRegionWALObserver.class;
+ private static Class masterCoprocessor = BaseMasterObserver.class;
+
+ private static final String[] regionServerSystemCoprocessors =
+ new String[]{
+ regionCoprocessor1.getSimpleName(),
+ regionServerCoprocessor.getSimpleName()
+ };
+
+ private static final String[] regionServerSystemAndUserCoprocessors =
+ new String[] {
+ regionCoprocessor1.getSimpleName(),
+ regionCoprocessor2.getSimpleName(),
+ regionServerCoprocessor.getSimpleName()
+ };
+
@BeforeClass
public static void setUpBeforeClass() throws Exception {
- TEST_UTIL.startMiniCluster(1);
conf = TEST_UTIL.getConfiguration();
+
+ // regionCoprocessor1 will be loaded on all regionservers, since it is
+ // loaded for any tables (user or meta).
+ conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+ regionCoprocessor1.getName());
+
+ // regionCoprocessor2 will be loaded only on regionservers that serve a
+ // user table region. Therefore, if there are no user tables loaded,
+ // this coprocessor will not be loaded on any regionserver.
+ conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
+ regionCoprocessor2.getName());
+
+ conf.setStrings(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
+ regionServerCoprocessor.getName());
+ conf.setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
+ masterCoprocessor.getName());
+ TEST_UTIL.startMiniCluster(1);
cluster = TEST_UTIL.getDFSCluster();
}
@@ -180,7 +219,7 @@ public class TestClassLoading {
LOG.info("Copied jar file to HDFS: " + jarFileOnHDFS1);
fs.copyFromLocalFile(new Path(jarFile2.getPath()),
- new Path(fs.getUri().toString() + Path.SEPARATOR));
+ new Path(fs.getUri().toString() + Path.SEPARATOR));
String jarFileOnHDFS2 = fs.getUri().toString() + Path.SEPARATOR +
jarFile2.getName();
assertTrue("Copy jar file to HDFS failed.",
@@ -204,10 +243,11 @@ public class TestClassLoading {
admin.createTable(htd);
// verify that the coprocessors were loaded
- boolean found1 = false, found2 = false, found2_k1 = false, found2_k2 = false,
- found2_k3 = false;
+ boolean found1 = false, found2 = false, found2_k1 = false,
+ found2_k2 = false, found2_k3 = false;
MiniHBaseCluster hbase = TEST_UTIL.getHBaseCluster();
- for (HRegion region: hbase.getRegionServer(0).getOnlineRegionsLocalContext()) {
+ for (HRegion region:
+ hbase.getRegionServer(0).getOnlineRegionsLocalContext()) {
if (region.getRegionNameAsString().startsWith(tableName)) {
CoprocessorEnvironment env;
env = region.getCoprocessorHost().findCoprocessorEnvironment(cpName1);
@@ -247,7 +287,8 @@ public class TestClassLoading {
// verify that the coprocessor was loaded
boolean found = false;
MiniHBaseCluster hbase = TEST_UTIL.getHBaseCluster();
- for (HRegion region: hbase.getRegionServer(0).getOnlineRegionsLocalContext()) {
+ for (HRegion region:
+ hbase.getRegionServer(0).getOnlineRegionsLocalContext()) {
if (region.getRegionNameAsString().startsWith(cpName3)) {
found = (region.getCoprocessorHost().findCoprocessor(cpName3) != null);
}
@@ -310,7 +351,8 @@ public class TestClassLoading {
found5_k4 = false;
MiniHBaseCluster hbase = TEST_UTIL.getHBaseCluster();
- for (HRegion region: hbase.getRegionServer(0).getOnlineRegionsLocalContext()) {
+ for (HRegion region:
+ hbase.getRegionServer(0).getOnlineRegionsLocalContext()) {
if (region.getRegionNameAsString().startsWith(tableName)) {
found_1 = found_1 ||
(region.getCoprocessorHost().findCoprocessor(cpName1) != null);
@@ -333,6 +375,7 @@ public class TestClassLoading {
}
}
}
+
assertTrue("Class " + cpName1 + " was missing on a region", found_1);
assertTrue("Class " + cpName2 + " was missing on a region", found_2);
assertTrue("Class SimpleRegionObserver was missing on a region", found_3);
@@ -344,4 +387,157 @@ public class TestClassLoading {
assertTrue("Configuration key 'k3' was missing on a region", found5_k3);
assertFalse("Configuration key 'k4' wasn't configured", found5_k4);
}
+
+ @Test
+ public void testRegionServerCoprocessorsReported() throws Exception {
+ // HBASE 4070: Improve region server metrics to report loaded coprocessors
+ // to master: verify that each regionserver is reporting the correct set of
+ // loaded coprocessors.
+
+ // We rely on the fact that getCoprocessors() will return a sorted
+ // display of the coprocessors' names, so for example, regionCoprocessor1's
+ // name "ColumnAggregationEndpoint" will appear before regionCoprocessor2's
+ // name "GenericEndpoint" because "C" is before "G" lexicographically.
+
+ HBaseAdmin admin = new HBaseAdmin(this.conf);
+
+ // disable all user tables, if any are loaded.
+ for (HTableDescriptor htd: admin.listTables()) {
+ if (!htd.isMetaTable()) {
+ String tableName = htd.getNameAsString();
+ if (admin.isTableEnabled(tableName)) {
+ try {
+ admin.disableTable(htd.getNameAsString());
+ } catch (TableNotEnabledException e) {
+ // ignoring this exception for now : not sure why it's happening.
+ }
+ }
+ }
+ }
+
+ // should only be system coprocessors loaded at this point.
+ assertAllRegionServers(regionServerSystemCoprocessors,null);
+
+ // The next two tests enable and disable user tables to see if coprocessor
+ // load reporting changes as coprocessors are loaded and unloaded.
+ //
+
+ // Create a table.
+ // should cause regionCoprocessor2 to be loaded, since we've specified it
+ // for loading on any user table with USER_REGION_COPROCESSOR_CONF_KEY
+ // in setUpBeforeClass().
+ String userTable1 = "userTable1";
+ HTableDescriptor userTD1 = new HTableDescriptor(userTable1);
+ admin.createTable(userTD1);
+ // table should be enabled now.
+ assertTrue(admin.isTableEnabled(userTable1));
+ assertAllRegionServers(regionServerSystemAndUserCoprocessors, userTable1);
+
+ // unload and make sure we're back to only system coprocessors again.
+ admin.disableTable(userTable1);
+ assertAllRegionServers(regionServerSystemCoprocessors,null);
+
+ // create another table, with its own specified coprocessor.
+ String userTable2 = "userTable2";
+ HTableDescriptor htd2 = new HTableDescriptor(userTable2);
+
+ String userTableCP = "userTableCP";
+ File jarFile1 = buildCoprocessorJar(userTableCP);
+ htd2.addFamily(new HColumnDescriptor("myfamily"));
+ htd2.setValue("COPROCESSOR$1", jarFile1.toString() + "|" + userTableCP +
+ "|" + Coprocessor.PRIORITY_USER);
+ admin.createTable(htd2);
+ // table should be enabled now.
+ assertTrue(admin.isTableEnabled(userTable2));
+
+ ArrayList existingCPsPlusNew =
+ new ArrayList(Arrays.asList(regionServerSystemAndUserCoprocessors));
+ existingCPsPlusNew.add(userTableCP);
+ String[] existingCPsPlusNewArray = new String[existingCPsPlusNew.size()];
+ assertAllRegionServers(existingCPsPlusNew.toArray(existingCPsPlusNewArray),
+ userTable2);
+
+ admin.disableTable(userTable2);
+ assertTrue(admin.isTableDisabled(userTable2));
+
+ // we should be back to only system coprocessors again.
+ assertAllRegionServers(regionServerSystemCoprocessors, null);
+
+ }
+
+ /**
+ * return the subset of all regionservers
+ * (actually returns set of HServerLoads)
+ * which host some region in a given table.
+ * used by assertAllRegionServers() below to
+ * test reporting of loaded coprocessors.
+ * @param tableName : given table.
+ * @return subset of all servers.
+ */
+ Map serversForTable(String tableName) {
+ Map serverLoadHashMap =
+ new HashMap();
+ for(Map.Entry server:
+ TEST_UTIL.getMiniHBaseCluster().getMaster().getServerManager().
+ getOnlineServers().entrySet()) {
+ for(Map.Entry region:
+ server.getValue().getRegionsLoad().entrySet()) {
+ if (region.getValue().getNameAsString().equals(tableName)) {
+ // this server server hosts a region of tableName: add this server..
+ serverLoadHashMap.put(server.getKey(),server.getValue());
+ // .. and skip the rest of the regions that it hosts.
+ break;
+ }
+ }
+ }
+ return serverLoadHashMap;
+ }
+
+ void assertAllRegionServers(String[] expectedCoprocessors, String tableName)
+ throws InterruptedException {
+ Map servers;
+ String[] actualCoprocessors = null;
+ boolean success = false;
+ for(int i = 0; i < 5; i++) {
+ if (tableName == null) {
+ //if no tableName specified, use all servers.
+ servers =
+ TEST_UTIL.getMiniHBaseCluster().getMaster().getServerManager().
+ getOnlineServers();
+ } else {
+ servers = serversForTable(tableName);
+ }
+ boolean any_failed = false;
+ for(Map.Entry server: servers.entrySet()) {
+ actualCoprocessors = server.getValue().getCoprocessors();
+ if (!Arrays.equals(actualCoprocessors, expectedCoprocessors)) {
+ LOG.debug("failed comparison: actual: " +
+ Arrays.toString(actualCoprocessors) +
+ " ; expected: " + Arrays.toString(expectedCoprocessors));
+ any_failed = true;
+ break;
+ }
+ }
+ if (any_failed == false) {
+ success = true;
+ break;
+ }
+ LOG.debug("retrying after failed comparison: " + i);
+ Thread.sleep(1000);
+ }
+ assertTrue(success);
+ }
+
+ @Test
+ public void testMasterCoprocessorsReported() {
+ // HBASE 4070: Improve region server metrics to report loaded coprocessors
+ // to master: verify that the master is reporting the correct set of
+ // loaded coprocessors.
+ final String loadedMasterCoprocessorsVerify =
+ "[" + masterCoprocessor.getSimpleName() + "]";
+ String loadedMasterCoprocessors =
+ java.util.Arrays.toString(
+ TEST_UTIL.getHBaseCluster().getMaster().getCoprocessors());
+ assertEquals(loadedMasterCoprocessorsVerify, loadedMasterCoprocessors);
+ }
}