diff --git src/main/jamon/org/apache/hbase/tmpl/master/MasterStatusTmpl.jamon src/main/jamon/org/apache/hbase/tmpl/master/MasterStatusTmpl.jamon index abeb850..43832c8 100644 --- src/main/jamon/org/apache/hbase/tmpl/master/MasterStatusTmpl.jamon +++ src/main/jamon/org/apache/hbase/tmpl/master/MasterStatusTmpl.jamon @@ -99,6 +99,12 @@ org.apache.hadoop.hbase.HTableDescriptor; Fragmentation<% frags.get("-TOTAL-") != null ? frags.get("-TOTAL-").intValue() + "%" : "n/a" %>Overall fragmentation of all tables, including .META. and -ROOT-. Zookeeper Quorum<% master.getZooKeeperWatcher().getQuorum() %>Addresses of all registered ZK servers. For more, see zk dump. + + + Coprocessors<% java.util.Arrays.toString(master.getCoprocessors()) %> + + Coprocessors currently loaded loaded by the master + <& ../common/TaskMonitorTmpl; filter = filter &> diff --git src/main/jamon/org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon src/main/jamon/org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon index be6fceb..1d7860f 100644 --- src/main/jamon/org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon +++ src/main/jamon/org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon @@ -77,6 +77,13 @@ org.apache.hadoop.hbase.HRegionInfo; HBase Compiled<% org.apache.hadoop.hbase.util.VersionInfo.getDate() %>, <% org.apache.hadoop.hbase.util.VersionInfo.getUser() %>When HBase version was compiled and by whom Metrics<% metrics.toString() %>RegionServer Metrics; file and heap sizes are in megabytes Zookeeper Quorum<% regionServer.getZooKeeper().getQuorum() %>Addresses of all registered ZK servers + + Coprocessors + + <% java.util.Arrays.toString(regionServer.getCoprocessors()) %> + + Coprocessors currently loaded by this regionserver + <& ../common/TaskMonitorTmpl; filter = filter &> diff --git src/main/java/org/apache/hadoop/hbase/ClusterStatus.java src/main/java/org/apache/hadoop/hbase/ClusterStatus.java index 01bc1dd..f64a54d 100644 --- src/main/java/org/apache/hadoop/hbase/ClusterStatus.java +++ src/main/java/org/apache/hadoop/hbase/ClusterStatus.java @@ -24,6 +24,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -66,6 +67,7 @@ public class ClusterStatus extends VersionedWritable { private Collection deadServers; private Map intransition; private String clusterId; + private String[] masterCoprocessors; /** * Constructor, for Writable @@ -76,12 +78,14 @@ public class ClusterStatus extends VersionedWritable { public ClusterStatus(final String hbaseVersion, final String clusterid, final Map servers, - final Collection deadServers, final Map rit) { + final Collection deadServers, final Map rit, + final String[] masterCoprocessors) { this.hbaseVersion = hbaseVersion; this.liveServers = servers; this.deadServers = deadServers; this.intransition = rit; this.clusterId = clusterid; + this.masterCoprocessors = masterCoprocessors; } /** @@ -155,7 +159,8 @@ public class ClusterStatus extends VersionedWritable { return (getVersion() == ((ClusterStatus)o).getVersion()) && getHBaseVersion().equals(((ClusterStatus)o).getHBaseVersion()) && this.liveServers.equals(((ClusterStatus)o).liveServers) && - deadServers.equals(((ClusterStatus)o).deadServers); + deadServers.equals(((ClusterStatus)o).deadServers) && + Arrays.equals(this.masterCoprocessors, ((ClusterStatus)o).masterCoprocessors); } /** @@ -205,6 +210,10 @@ public class ClusterStatus extends VersionedWritable { return clusterId; } + public String[] getMasterCoprocessors() { + return masterCoprocessors; + } + // // Writable // @@ -227,6 +236,10 @@ public class ClusterStatus extends VersionedWritable { e.getValue().write(out); } out.writeUTF(clusterId); + out.writeInt(masterCoprocessors.length); + for(String masterCoprocessor: masterCoprocessors) { + out.writeUTF(masterCoprocessor); + } } public void readFields(DataInput in) throws IOException { @@ -254,5 +267,10 @@ public class ClusterStatus extends VersionedWritable { this.intransition.put(key, regionState); } this.clusterId = in.readUTF(); + int masterCoprocessorsLength = in.readInt(); + masterCoprocessors = new String[masterCoprocessorsLength]; + for(int i = 0; i < masterCoprocessorsLength; i++) { + masterCoprocessors[i] = in.readUTF(); + } } -} \ No newline at end of file +} diff --git src/main/java/org/apache/hadoop/hbase/HServerLoad.java src/main/java/org/apache/hadoop/hbase/HServerLoad.java index 0c680e4..ee1c6c8 100644 --- src/main/java/org/apache/hadoop/hbase/HServerLoad.java +++ src/main/java/org/apache/hadoop/hbase/HServerLoad.java @@ -22,9 +22,12 @@ package org.apache.hadoop.hbase; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.Arrays; import java.util.Collections; import java.util.Map; +import java.util.Set; import java.util.TreeMap; +import java.util.TreeSet; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Strings; @@ -55,6 +58,33 @@ implements WritableComparable { /** the maximum allowable size of the heap, in MB */ private int maxHeapMB = 0; + // Regionserver-level coprocessors, e.g., WALObserver implementations. + // Region-level coprocessors, on the other hand, are stored inside RegionLoad + // objects. + private Set coprocessors = + new TreeSet(); + + /** + * HBASE-4070: Improve region server metrics to report loaded coprocessors. + * + * @return Returns the set of all coprocessors on this + * regionserver, where this set is the union of the + * regionserver-level coprocessors on one hand, and all of the region-level + * coprocessors, on the other. + * + * We must iterate through all regions loaded on this regionserver to + * obtain all of the region-level coprocessors. + */ + public String[] getCoprocessors() { + TreeSet returnValue = new TreeSet(coprocessors); + for (Map.Entry rls: getRegionsLoad().entrySet()) { + for (String coprocessor: rls.getValue().getCoprocessors()) { + returnValue.add(coprocessor); + } + } + return returnValue.toArray(new String[0]); + } + /** per-region load metrics */ private Map regionLoad = new TreeMap(Bytes.BYTES_COMPARATOR); @@ -114,6 +144,10 @@ implements WritableComparable { */ private int totalStaticBloomSizeKB; + // Region-level coprocessors. + Set coprocessors = + new TreeSet(); + /** * Constructor, for Writable */ @@ -133,6 +167,7 @@ implements WritableComparable { * @param writeRequestsCount * @param totalCompactingKVs * @param currentCompactedKVs + * @param coprocessors */ public RegionLoad(final byte[] name, final int stores, final int storefiles, final int storeUncompressedSizeMB, @@ -141,7 +176,8 @@ implements WritableComparable { final int rootIndexSizeKB, final int totalStaticIndexSizeKB, final int totalStaticBloomSizeKB, final int readRequestsCount, final int writeRequestsCount, - final long totalCompactingKVs, final long currentCompactedKVs) { + final long totalCompactingKVs, final long currentCompactedKVs, + final Set coprocessors) { this.name = name; this.stores = stores; this.storefiles = storefiles; @@ -156,9 +192,13 @@ implements WritableComparable { this.writeRequestsCount = writeRequestsCount; this.totalCompactingKVs = totalCompactingKVs; this.currentCompactedKVs = currentCompactedKVs; + this.coprocessors = coprocessors; } // Getters + private String[] getCoprocessors() { + return coprocessors.toArray(new String[0]); + } /** * @return the region name @@ -332,6 +372,11 @@ implements WritableComparable { this.totalStaticBloomSizeKB = in.readInt(); this.totalCompactingKVs = in.readLong(); this.currentCompactedKVs = in.readLong(); + int coprocessorsSize = in.readInt(); + coprocessors = new TreeSet(); + for (int i = 0; i < coprocessorsSize; i++) { + coprocessors.add(in.readUTF()); + } } public void write(DataOutput out) throws IOException { @@ -352,6 +397,10 @@ implements WritableComparable { out.writeInt(totalStaticBloomSizeKB); out.writeLong(totalCompactingKVs); out.writeLong(currentCompactedKVs); + out.writeInt(coprocessors.size()); + for (String coprocessor: coprocessors) { + out.writeUTF(coprocessor); + } } /** @@ -397,6 +446,11 @@ implements WritableComparable { } sb = Strings.appendKeyValue(sb, "compactionProgressPct", compactionProgressPct); + String coprocessors = Arrays.toString(getCoprocessors()); + if (coprocessors != null) { + sb = Strings.appendKeyValue(sb, "coprocessors", + Arrays.toString(getCoprocessors())); + } return sb.toString(); } } @@ -424,15 +478,18 @@ implements WritableComparable { * @param numberOfRequests * @param usedHeapMB * @param maxHeapMB + * @param coprocessors : coprocessors loaded at the regionserver-level */ public HServerLoad(final int totalNumberOfRequests, final int numberOfRequests, final int usedHeapMB, final int maxHeapMB, - final Map regionLoad) { + final Map regionLoad, + final Set coprocessors) { this.numberOfRequests = numberOfRequests; this.usedHeapMB = usedHeapMB; this.maxHeapMB = maxHeapMB; this.regionLoad = regionLoad; this.totalNumberOfRequests = totalNumberOfRequests; + this.coprocessors = coprocessors; } /** @@ -441,7 +498,7 @@ implements WritableComparable { */ public HServerLoad(final HServerLoad hsl) { this(hsl.totalNumberOfRequests, hsl.numberOfRequests, hsl.usedHeapMB, - hsl.maxHeapMB, hsl.getRegionsLoad()); + hsl.maxHeapMB, hsl.getRegionsLoad(), hsl.coprocessors); for (Map.Entry e : hsl.regionLoad.entrySet()) { this.regionLoad.put(e.getKey(), e.getValue()); } @@ -487,6 +544,10 @@ implements WritableComparable { sb = Strings.appendKeyValue(sb, "usedHeapMB", Integer.valueOf(this.usedHeapMB)); sb = Strings.appendKeyValue(sb, "maxHeapMB", Integer.valueOf(maxHeapMB)); + String coprocessors = Arrays.toString(getCoprocessors()); + if (coprocessors != null) { + sb = Strings.appendKeyValue(sb, "coprocessors", coprocessors); + } return sb.toString(); } @@ -607,6 +668,10 @@ implements WritableComparable { regionLoad.put(rl.getName(), rl); } totalNumberOfRequests = in.readInt(); + int coprocessorsSize = in.readInt(); + for(int i = 0; i < coprocessorsSize; i++) { + coprocessors.add(in.readUTF()); + } } public void write(DataOutput out) throws IOException { @@ -619,6 +684,10 @@ implements WritableComparable { for (RegionLoad rl: regionLoad.values()) rl.write(out); out.writeInt(totalNumberOfRequests); + out.writeInt(coprocessors.size()); + for (String coprocessor: coprocessors) { + out.writeUTF(coprocessor); + } } // Comparable diff --git src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index 92c959c..e10d7e9 100644 --- src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -1647,4 +1647,13 @@ public class HBaseAdmin implements Abortable, Closeable { sn.getHostname(), sn.getPort()); return rs.rollHLogWriter(); } + + public String[] getMasterCoprocessors() { + try { + return getClusterStatus().getMasterCoprocessors(); + } catch (IOException e) { + LOG.error("Could not getClusterStatus()",e); + return null; + } + } } diff --git src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java index 7d2f82e..d5d555c 100644 --- src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java +++ src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java @@ -73,6 +73,14 @@ public abstract class CoprocessorHost { pathPrefix = UUID.randomUUID().toString(); } + /** + * Not to be confused with the per-object _coprocessors_ (above), + * coprocessorNames is static and stores the set of all coprocessors ever + * loaded by any thread in this JVM. It is strictly additive: coprocessors are + * added to coprocessorNames, by loadInstance() but are never removed, since + * the intention is to preserve a history of all loaded coprocessors for + * diagnosis in case of server crash (HBASE-4014). + */ private static Set coprocessorNames = Collections.synchronizedSet(new HashSet()); public static Set getLoadedCoprocessors() { @@ -80,6 +88,21 @@ public abstract class CoprocessorHost { } /** + * Used to create a parameter to the HServerLoad constructor so that + * HServerLoad can provide information about the coprocessors loaded by this + * regionserver. + * (HBASE-4070: Improve region server metrics to report loaded coprocessors + * to master). + */ + public Set getCoprocessors() { + Set returnValue = new TreeSet(); + for(CoprocessorEnvironment e: coprocessors) { + returnValue.add(e.getInstance().getClass().getSimpleName()); + } + return returnValue; + } + + /** * Load system coprocessors. Read the class names from configuration. * Called by constructor. */ diff --git src/main/java/org/apache/hadoop/hbase/master/HMaster.java src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 50b49a6..7c1c24b 100644 --- src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -28,6 +28,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -345,7 +346,7 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { } /** - * Initilize all ZK based system trackers. + * Initialize all ZK based system trackers. * @throws IOException * @throws InterruptedException */ @@ -1137,7 +1138,8 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { this.fileSystemManager.getClusterId(), this.serverManager.getOnlineServers(), this.serverManager.getDeadServers(), - this.assignmentManager.getRegionsInTransition()); + this.assignmentManager.getRegionsInTransition(), + this.getCoprocessors()); } public String getClusterId() { @@ -1155,6 +1157,15 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { return CoprocessorHost.getLoadedCoprocessors().toString(); } + /** + * @return array of coprocessor SimpleNames. + */ + public String[] getCoprocessors() { + Set masterCoprocessors = + getCoprocessorHost().getCoprocessors(); + return masterCoprocessors.toArray(new String[0]); + } + @Override public void abort(final String msg, final Throwable t) { if (cpHost != null) { diff --git src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index e2e694a..effcb6e 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -795,7 +795,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, ManagementFactory.getMemoryMXBean().getHeapMemoryUsage(); return new HServerLoad(requestCount.get(),(int)metrics.getRequests(), (int)(memory.getUsed() / 1024 / 1024), - (int) (memory.getMax() / 1024 / 1024), regionLoads); + (int) (memory.getMax() / 1024 / 1024), regionLoads, + this.hlog.getCoprocessorHost().getCoprocessors()); } String getOnlineRegionsAsPrintableString() { @@ -994,7 +995,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, storefileSizeMB, memstoreSizeMB, storefileIndexSizeMB, rootIndexSizeKB, totalStaticIndexSizeKB, totalStaticBloomSizeKB, (int) r.readRequestsCount.get(), (int) r.writeRequestsCount.get(), - totalCompactingKVs, currentCompactedKVs); + totalCompactingKVs, currentCompactedKVs, + r.getCoprocessorHost().getCoprocessors()); } /** @@ -3277,4 +3279,10 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, HLog wal = this.getWAL(); return wal.rollWriter(true); } + + // used by org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon (HBASE-4070). + public String[] getCoprocessors() { + HServerLoad hsl = buildServerLoad(); + return hsl == null? null: hsl.getCoprocessors(); + } } diff --git src/test/java/org/apache/hadoop/hbase/coprocessor/TestClassLoading.java src/test/java/org/apache/hadoop/hbase/coprocessor/TestClassLoading.java index eda5a9b..cdbba7e 100644 --- src/test/java/org/apache/hadoop/hbase/coprocessor/TestClassLoading.java +++ src/test/java/org/apache/hadoop/hbase/coprocessor/TestClassLoading.java @@ -25,12 +25,15 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HServerLoad; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableNotEnabledException; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -38,10 +41,12 @@ import org.apache.hadoop.fs.Path; import javax.tools.*; import java.io.*; import java.util.*; +import java.util.Arrays; import java.util.jar.*; import org.junit.*; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertFalse; @@ -63,10 +68,44 @@ public class TestClassLoading { static final String cpName4 = "TestCP4"; static final String cpName5 = "TestCP5"; + private static Class regionCoprocessor1 = ColumnAggregationEndpoint.class; + private static Class regionCoprocessor2 = GenericEndpoint.class; + private static Class regionServerCoprocessor = SampleRegionWALObserver.class; + private static Class masterCoprocessor = BaseMasterObserver.class; + + private static final String[] regionServerSystemCoprocessors = + new String[]{ + regionCoprocessor1.getSimpleName(), + regionServerCoprocessor.getSimpleName() + }; + + private static final String[] regionServerSystemAndUserCoprocessors = + new String[] { + regionCoprocessor1.getSimpleName(), + regionCoprocessor2.getSimpleName(), + regionServerCoprocessor.getSimpleName() + }; + @BeforeClass public static void setUpBeforeClass() throws Exception { - TEST_UTIL.startMiniCluster(1); conf = TEST_UTIL.getConfiguration(); + + // regionCoprocessor1 will be loaded on all regionservers, since it is + // loaded for any tables (user or meta). + conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + regionCoprocessor1.getName()); + + // regionCoprocessor2 will be loaded only on regionservers that serve a + // user table region. Therefore, if there are no user tables loaded, + // this coprocessor will not be loaded on any regionserver. + conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, + regionCoprocessor2.getName()); + + conf.setStrings(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY, + regionServerCoprocessor.getName()); + conf.setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, + masterCoprocessor.getName()); + TEST_UTIL.startMiniCluster(1); cluster = TEST_UTIL.getDFSCluster(); } @@ -180,7 +219,7 @@ public class TestClassLoading { LOG.info("Copied jar file to HDFS: " + jarFileOnHDFS1); fs.copyFromLocalFile(new Path(jarFile2.getPath()), - new Path(fs.getUri().toString() + Path.SEPARATOR)); + new Path(fs.getUri().toString() + Path.SEPARATOR)); String jarFileOnHDFS2 = fs.getUri().toString() + Path.SEPARATOR + jarFile2.getName(); assertTrue("Copy jar file to HDFS failed.", @@ -204,10 +243,11 @@ public class TestClassLoading { admin.createTable(htd); // verify that the coprocessors were loaded - boolean found1 = false, found2 = false, found2_k1 = false, found2_k2 = false, - found2_k3 = false; + boolean found1 = false, found2 = false, found2_k1 = false, + found2_k2 = false, found2_k3 = false; MiniHBaseCluster hbase = TEST_UTIL.getHBaseCluster(); - for (HRegion region: hbase.getRegionServer(0).getOnlineRegionsLocalContext()) { + for (HRegion region: + hbase.getRegionServer(0).getOnlineRegionsLocalContext()) { if (region.getRegionNameAsString().startsWith(tableName)) { CoprocessorEnvironment env; env = region.getCoprocessorHost().findCoprocessorEnvironment(cpName1); @@ -247,7 +287,8 @@ public class TestClassLoading { // verify that the coprocessor was loaded boolean found = false; MiniHBaseCluster hbase = TEST_UTIL.getHBaseCluster(); - for (HRegion region: hbase.getRegionServer(0).getOnlineRegionsLocalContext()) { + for (HRegion region: + hbase.getRegionServer(0).getOnlineRegionsLocalContext()) { if (region.getRegionNameAsString().startsWith(cpName3)) { found = (region.getCoprocessorHost().findCoprocessor(cpName3) != null); } @@ -310,7 +351,8 @@ public class TestClassLoading { found5_k4 = false; MiniHBaseCluster hbase = TEST_UTIL.getHBaseCluster(); - for (HRegion region: hbase.getRegionServer(0).getOnlineRegionsLocalContext()) { + for (HRegion region: + hbase.getRegionServer(0).getOnlineRegionsLocalContext()) { if (region.getRegionNameAsString().startsWith(tableName)) { found_1 = found_1 || (region.getCoprocessorHost().findCoprocessor(cpName1) != null); @@ -333,6 +375,7 @@ public class TestClassLoading { } } } + assertTrue("Class " + cpName1 + " was missing on a region", found_1); assertTrue("Class " + cpName2 + " was missing on a region", found_2); assertTrue("Class SimpleRegionObserver was missing on a region", found_3); @@ -344,4 +387,157 @@ public class TestClassLoading { assertTrue("Configuration key 'k3' was missing on a region", found5_k3); assertFalse("Configuration key 'k4' wasn't configured", found5_k4); } + + @Test + public void testRegionServerCoprocessorsReported() throws Exception { + // HBASE 4070: Improve region server metrics to report loaded coprocessors + // to master: verify that each regionserver is reporting the correct set of + // loaded coprocessors. + + // We rely on the fact that getCoprocessors() will return a sorted + // display of the coprocessors' names, so for example, regionCoprocessor1's + // name "ColumnAggregationEndpoint" will appear before regionCoprocessor2's + // name "GenericEndpoint" because "C" is before "G" lexicographically. + + HBaseAdmin admin = new HBaseAdmin(this.conf); + + // disable all user tables, if any are loaded. + for (HTableDescriptor htd: admin.listTables()) { + if (!htd.isMetaTable()) { + String tableName = htd.getNameAsString(); + if (admin.isTableEnabled(tableName)) { + try { + admin.disableTable(htd.getNameAsString()); + } catch (TableNotEnabledException e) { + // ignoring this exception for now : not sure why it's happening. + } + } + } + } + + // should only be system coprocessors loaded at this point. + assertAllRegionServers(regionServerSystemCoprocessors,null); + + // The next two tests enable and disable user tables to see if coprocessor + // load reporting changes as coprocessors are loaded and unloaded. + // + + // Create a table. + // should cause regionCoprocessor2 to be loaded, since we've specified it + // for loading on any user table with USER_REGION_COPROCESSOR_CONF_KEY + // in setUpBeforeClass(). + String userTable1 = "userTable1"; + HTableDescriptor userTD1 = new HTableDescriptor(userTable1); + admin.createTable(userTD1); + // table should be enabled now. + assertTrue(admin.isTableEnabled(userTable1)); + assertAllRegionServers(regionServerSystemAndUserCoprocessors, userTable1); + + // unload and make sure we're back to only system coprocessors again. + admin.disableTable(userTable1); + assertAllRegionServers(regionServerSystemCoprocessors,null); + + // create another table, with its own specified coprocessor. + String userTable2 = "userTable2"; + HTableDescriptor htd2 = new HTableDescriptor(userTable2); + + String userTableCP = "userTableCP"; + File jarFile1 = buildCoprocessorJar(userTableCP); + htd2.addFamily(new HColumnDescriptor("myfamily")); + htd2.setValue("COPROCESSOR$1", jarFile1.toString() + "|" + userTableCP + + "|" + Coprocessor.PRIORITY_USER); + admin.createTable(htd2); + // table should be enabled now. + assertTrue(admin.isTableEnabled(userTable2)); + + ArrayList existingCPsPlusNew = + new ArrayList(Arrays.asList(regionServerSystemAndUserCoprocessors)); + existingCPsPlusNew.add(userTableCP); + String[] existingCPsPlusNewArray = new String[existingCPsPlusNew.size()]; + assertAllRegionServers(existingCPsPlusNew.toArray(existingCPsPlusNewArray), + userTable2); + + admin.disableTable(userTable2); + assertTrue(admin.isTableDisabled(userTable2)); + + // we should be back to only system coprocessors again. + assertAllRegionServers(regionServerSystemCoprocessors, null); + + } + + /** + * return the subset of all regionservers + * (actually returns set of HServerLoads) + * which host some region in a given table. + * used by assertAllRegionServers() below to + * test reporting of loaded coprocessors. + * @param tableName : given table. + * @return subset of all servers. + */ + Map serversForTable(String tableName) { + Map serverLoadHashMap = + new HashMap(); + for(Map.Entry server: + TEST_UTIL.getMiniHBaseCluster().getMaster().getServerManager(). + getOnlineServers().entrySet()) { + for(Map.Entry region: + server.getValue().getRegionsLoad().entrySet()) { + if (region.getValue().getNameAsString().equals(tableName)) { + // this server server hosts a region of tableName: add this server.. + serverLoadHashMap.put(server.getKey(),server.getValue()); + // .. and skip the rest of the regions that it hosts. + break; + } + } + } + return serverLoadHashMap; + } + + void assertAllRegionServers(String[] expectedCoprocessors, String tableName) + throws InterruptedException { + Map servers; + String[] actualCoprocessors = null; + boolean success = false; + for(int i = 0; i < 5; i++) { + if (tableName == null) { + //if no tableName specified, use all servers. + servers = + TEST_UTIL.getMiniHBaseCluster().getMaster().getServerManager(). + getOnlineServers(); + } else { + servers = serversForTable(tableName); + } + boolean any_failed = false; + for(Map.Entry server: servers.entrySet()) { + actualCoprocessors = server.getValue().getCoprocessors(); + if (!Arrays.equals(actualCoprocessors, expectedCoprocessors)) { + LOG.debug("failed comparison: actual: " + + Arrays.toString(actualCoprocessors) + + " ; expected: " + Arrays.toString(expectedCoprocessors)); + any_failed = true; + break; + } + } + if (any_failed == false) { + success = true; + break; + } + LOG.debug("retrying after failed comparison: " + i); + Thread.sleep(1000); + } + assertTrue(success); + } + + @Test + public void testMasterCoprocessorsReported() { + // HBASE 4070: Improve region server metrics to report loaded coprocessors + // to master: verify that the master is reporting the correct set of + // loaded coprocessors. + final String loadedMasterCoprocessorsVerify = + "[" + masterCoprocessor.getSimpleName() + "]"; + String loadedMasterCoprocessors = + java.util.Arrays.toString( + TEST_UTIL.getHBaseCluster().getMaster().getCoprocessors()); + assertEquals(loadedMasterCoprocessorsVerify, loadedMasterCoprocessors); + } }