diff --git src/main/jamon/org/apache/hbase/tmpl/master/MasterStatusTmpl.jamon src/main/jamon/org/apache/hbase/tmpl/master/MasterStatusTmpl.jamon index abeb850..ebe2154 100644 --- src/main/jamon/org/apache/hbase/tmpl/master/MasterStatusTmpl.jamon +++ src/main/jamon/org/apache/hbase/tmpl/master/MasterStatusTmpl.jamon @@ -99,6 +99,7 @@ org.apache.hadoop.hbase.HTableDescriptor; Fragmentation<% frags.get("-TOTAL-") != null ? frags.get("-TOTAL-").intValue() + "%" : "n/a" %>Overall fragmentation of all tables, including .META. and -ROOT-. Zookeeper Quorum<% master.getZooKeeperWatcher().getQuorum() %>Addresses of all registered ZK servers. For more, see zk dump. +Coprocessors<% master.generateCoprocessorString() %>Coprocessors currently loaded loaded by the master. <& ../common/TaskMonitorTmpl; filter = filter &> diff --git src/main/jamon/org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon src/main/jamon/org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon index be6fceb..218357f 100644 --- src/main/jamon/org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon +++ src/main/jamon/org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon @@ -77,6 +77,7 @@ org.apache.hadoop.hbase.HRegionInfo; HBase Compiled<% org.apache.hadoop.hbase.util.VersionInfo.getDate() %>, <% org.apache.hadoop.hbase.util.VersionInfo.getUser() %>When HBase version was compiled and by whom Metrics<% metrics.toString() %>RegionServer Metrics; file and heap sizes are in megabytes Zookeeper Quorum<% regionServer.getZooKeeper().getQuorum() %>Addresses of all registered ZK servers +Coprocessors<% regionServer.generateCoprocessorString() %>Coprocessors currently loaded by this regionserver. <& ../common/TaskMonitorTmpl; filter = filter &> diff --git src/main/java/org/apache/hadoop/hbase/HServerLoad.java src/main/java/org/apache/hadoop/hbase/HServerLoad.java index 0c680e4..9bb5d7f 100644 --- src/main/java/org/apache/hadoop/hbase/HServerLoad.java +++ src/main/java/org/apache/hadoop/hbase/HServerLoad.java @@ -23,8 +23,12 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; import java.util.Map; +import java.util.Set; import java.util.TreeMap; +import java.util.TreeSet; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Strings; @@ -55,6 +59,11 @@ implements WritableComparable { /** the maximum allowable size of the heap, in MB */ private int maxHeapMB = 0; + // region server level coprocessors, i.e., WALObserver implementation + private Set coprocessors = null; + + private String coprocessorString; + /** per-region load metrics */ private Map regionLoad = new TreeMap(Bytes.BYTES_COMPARATOR); @@ -114,6 +123,9 @@ implements WritableComparable { */ private int totalStaticBloomSizeKB; + private Set coprocessors; + private String coprocessorString; + /** * Constructor, for Writable */ @@ -133,6 +145,7 @@ implements WritableComparable { * @param writeRequestsCount * @param totalCompactingKVs * @param currentCompactedKVs + * @param coprocessors */ public RegionLoad(final byte[] name, final int stores, final int storefiles, final int storeUncompressedSizeMB, @@ -141,7 +154,8 @@ implements WritableComparable { final int rootIndexSizeKB, final int totalStaticIndexSizeKB, final int totalStaticBloomSizeKB, final int readRequestsCount, final int writeRequestsCount, - final long totalCompactingKVs, final long currentCompactedKVs) { + final long totalCompactingKVs, final long currentCompactedKVs, + final Set coprocessors) { this.name = name; this.stores = stores; this.storefiles = storefiles; @@ -156,10 +170,27 @@ implements WritableComparable { this.writeRequestsCount = writeRequestsCount; this.totalCompactingKVs = totalCompactingKVs; this.currentCompactedKVs = currentCompactedKVs; + this.coprocessors = coprocessors; + this.coprocessorString = getLoadedCoprocessors(); + } + + private String getLoadedCoprocessors() { + StringBuilder sb = new StringBuilder(); + sb.append("["); + Iterator i = coprocessors.iterator(); + if (i.hasNext()) { + for (;;) { + CoprocessorEnvironment ce = i.next(); + sb.append(ce.getInstance().getClass().getSimpleName()); + if (! i.hasNext()) break; + sb.append(", "); + } + } + sb.append("]"); + return sb.toString(); } // Getters - /** * @return the region name */ @@ -332,6 +363,7 @@ implements WritableComparable { this.totalStaticBloomSizeKB = in.readInt(); this.totalCompactingKVs = in.readLong(); this.currentCompactedKVs = in.readLong(); + this.coprocessorString = in.readUTF(); } public void write(DataOutput out) throws IOException { @@ -352,6 +384,7 @@ implements WritableComparable { out.writeInt(totalStaticBloomSizeKB); out.writeLong(totalCompactingKVs); out.writeLong(currentCompactedKVs); + out.writeUTF(coprocessorString); } /** @@ -397,6 +430,7 @@ implements WritableComparable { } sb = Strings.appendKeyValue(sb, "compactionProgressPct", compactionProgressPct); + sb = Strings.appendKeyValue(sb, "coprocessors", this.coprocessorString); return sb.toString(); } } @@ -427,12 +461,15 @@ implements WritableComparable { */ public HServerLoad(final int totalNumberOfRequests, final int numberOfRequests, final int usedHeapMB, final int maxHeapMB, - final Map regionLoad) { + final Map regionLoad, + final Set walCoprocessors) { this.numberOfRequests = numberOfRequests; this.usedHeapMB = usedHeapMB; this.maxHeapMB = maxHeapMB; this.regionLoad = regionLoad; this.totalNumberOfRequests = totalNumberOfRequests; + this.coprocessors = walCoprocessors; + setCoprocessorString(this.regionLoad, this.coprocessors); } /** @@ -441,12 +478,55 @@ implements WritableComparable { */ public HServerLoad(final HServerLoad hsl) { this(hsl.totalNumberOfRequests, hsl.numberOfRequests, hsl.usedHeapMB, - hsl.maxHeapMB, hsl.getRegionsLoad()); + hsl.maxHeapMB, hsl.getRegionsLoad(), hsl.coprocessors); for (Map.Entry e : hsl.regionLoad.entrySet()) { this.regionLoad.put(e.getKey(), e.getValue()); } } + public String getCoprocessorString() { + return coprocessorString; + } + + /** + * Set coprocessorsString to a list of comma-separated coprocessors, enclosed in + * square brackets. + * (cf. {@link HMaster::setCoprocessorString()}). + */ + private void setCoprocessorString( + final Map rls, + Set rsCoprocessors) { + StringBuilder sb = new StringBuilder(); + sb.append("["); + Comparator classNameComparator = + new Comparator() { + @Override + public int compare(CoprocessorEnvironment o1, CoprocessorEnvironment o2) { + return o1.getInstance().getClass().getSimpleName().compareTo( + o2.getInstance().getClass().getSimpleName()); + } + }; + Set allCoprocessors = + new TreeSet(classNameComparator); + for (Map.Entry rl : rls.entrySet()) { + allCoprocessors.addAll(rl.getValue().coprocessors); + } + allCoprocessors.addAll(rsCoprocessors); + + Iterator i = + allCoprocessors.iterator(); + if (i.hasNext()) { + for (;;) { + CoprocessorEnvironment ce = i.next(); + sb.append(ce.getInstance().getClass().getSimpleName()); + if (! i.hasNext()) break; + sb.append(", "); + } + } + sb.append("]"); + coprocessorString = sb.toString(); + } + /** * Originally, this method factored in the effect of requests going to the * server as well. However, this does not interact very well with the current @@ -457,9 +537,6 @@ implements WritableComparable { * @return load factor for this server */ public int getLoad() { - // int load = numberOfRequests == 0 ? 1 : numberOfRequests; - // load *= numberOfRegions == 0 ? 1 : numberOfRegions; - // return load; return this.regionLoad.size(); } @@ -487,6 +564,7 @@ implements WritableComparable { sb = Strings.appendKeyValue(sb, "usedHeapMB", Integer.valueOf(this.usedHeapMB)); sb = Strings.appendKeyValue(sb, "maxHeapMB", Integer.valueOf(maxHeapMB)); + sb = Strings.appendKeyValue(sb, "coprocessors", getCoprocessorString()); return sb.toString(); } @@ -591,6 +669,10 @@ implements WritableComparable { return count; } + public String getLoadedCoprocessors() { + return coprocessorString; + } + // Writable public void readFields(DataInput in) throws IOException { @@ -607,6 +689,7 @@ implements WritableComparable { regionLoad.put(rl.getName(), rl); } totalNumberOfRequests = in.readInt(); + coprocessorString = in.readUTF(); } public void write(DataOutput out) throws IOException { @@ -619,6 +702,7 @@ implements WritableComparable { for (RegionLoad rl: regionLoad.values()) rl.write(out); out.writeInt(totalNumberOfRequests); + out.writeUTF(coprocessorString); } // Comparable diff --git src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index a55a4b1..cb69ee9 100644 --- src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -82,7 +82,7 @@ public class HBaseAdmin implements Abortable, Closeable { private final long pause; private final int numRetries; // Some operations can take a long time such as disable of big table. - // numRetries is for 'normal' stuff... Mutliply by this factor when + // numRetries is for 'normal' stuff... Multiply by this factor when // want to wait a long time. private final int retryLongerMultiplier; private boolean aborted; @@ -1649,4 +1649,15 @@ public class HBaseAdmin implements Abortable, Closeable { sn.getHostname(), sn.getPort()); return rs.rollHLogWriter(); } + + public String getMasterCoprocessors() { + try { + return getMaster().generateCoprocessorString(); + } catch (MasterNotRunningException e) { + return "master not running."; + } catch (ZooKeeperConnectionException e) { + return "could not connect to Zookeeper."; + } + } + } diff --git src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java index dbae4fd..481fe1c 100644 --- src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java +++ src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java @@ -73,12 +73,24 @@ public abstract class CoprocessorHost { pathPrefix = UUID.randomUUID().toString(); } + /** + * Not to be confused with the per-object _coprocessors_ (above), + * coprocessorNames is static and stores the set of all coprocessors ever + * loaded by any thread in this JVM. It is strictly additive: coprocessors are + * added to coprocessorNames, by loadInstance() but are never removed, since + * the intention is to preserve a history of all loaded coprocessors for + * diagnosis in case of server crash (HBASE-4014). + */ private static Set coprocessorNames = Collections.synchronizedSet(new HashSet()); public static Set getLoadedCoprocessors() { return coprocessorNames; } + public SortedSet getCoprocessors() { + return coprocessors; + } + /** * Load system coprocessors. Read the class names from configuration. * Called by constructor. diff --git src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java index a069400..941a2f1 100644 --- src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java +++ src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java @@ -258,4 +258,6 @@ public interface HMasterInterface extends VersionedProtocol { */ public HTableDescriptor[] getHTableDescriptors(List tableNames); + public String generateCoprocessorString(); + } diff --git src/main/java/org/apache/hadoop/hbase/master/HMaster.java src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 270f3f3..f38157c 100644 --- src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -25,9 +25,10 @@ import java.lang.reflect.InvocationTargetException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.ArrayList; -import java.util.Arrays; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.logging.Log; @@ -35,6 +36,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Chore; import org.apache.hadoop.hbase.ClusterStatus; +import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; @@ -352,7 +354,7 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { } /** - * Initilize all ZK based system trackers. + * Initialize all ZK based system trackers. * @throws IOException * @throws InterruptedException */ @@ -1505,10 +1507,34 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { } /** + * @return list of comma-separated coprocessors, enclosed in + * square brackets. + * (cf. {@link HServerLoad::setCoprocessorString()}). + */ + public String generateCoprocessorString() { + Set coprocessors = + this.getCoprocessorHost().getCoprocessors(); + StringBuilder sb = new StringBuilder(); + sb.append("["); + Iterator i = coprocessors.iterator(); + if (i.hasNext()) { + for (;;) { + CoprocessorEnvironment ce = i.next(); + sb.append(ce.getInstance().getClass().getSimpleName()); + if (! i.hasNext()) break; + sb.append(", "); + } + } + sb.append("]"); + return sb.toString(); + } + + /** * @see org.apache.hadoop.hbase.master.HMasterCommandLine */ public static void main(String [] args) throws Exception { VersionInfo.logVersion(); new HMasterCommandLine(HMaster.class).doMain(args); } + } diff --git src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 96b763b..dda708c 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -788,7 +788,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, ManagementFactory.getMemoryMXBean().getHeapMemoryUsage(); return new HServerLoad(requestCount.get(),(int)metrics.getRequests(), (int)(memory.getUsed() / 1024 / 1024), - (int) (memory.getMax() / 1024 / 1024), regionLoads); + (int) (memory.getMax() / 1024 / 1024), regionLoads, + this.hlog.getCoprocessorHost().getCoprocessors()); } String getOnlineRegionsAsPrintableString() { @@ -987,7 +988,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, storefileSizeMB, memstoreSizeMB, storefileIndexSizeMB, rootIndexSizeKB, totalStaticIndexSizeKB, totalStaticBloomSizeKB, (int) r.readRequestsCount.get(), (int) r.writeRequestsCount.get(), - totalCompactingKVs, currentCompactedKVs); + totalCompactingKVs, currentCompactedKVs, + r.getCoprocessorHost().getCoprocessors()); } /** @@ -3238,4 +3240,15 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, HLog wal = this.getWAL(); return wal.rollWriter(true); } + + public String generateCoprocessorString() { + HServerLoad hsl = buildServerLoad(); + if (hsl != null) { + return hsl.getCoprocessorString(); + } else { + LOG.error("Could not get a HServerLoad for this regionserver: returning " + + "an empty string."); + } + return ""; + } } diff --git src/main/ruby/hbase/admin.rb src/main/ruby/hbase/admin.rb index b244ffe..a6365c2 100644 --- src/main/ruby/hbase/admin.rb +++ src/main/ruby/hbase/admin.rb @@ -381,6 +381,7 @@ module Hbase for k, v in status.getRegionsInTransition() puts(" %s" % [v]) end + puts("master coprocessors: %s" % [@admin.getMasterCoprocessors() ]) puts("%d live servers" % [ status.getServersSize() ]) for server in status.getServers() puts(" %s:%d %d" % \ diff --git src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java index a8f2a9c..c26e00a 100644 --- src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java +++ src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.coprocessor; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.Map; @@ -28,11 +29,13 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HServerLoad; import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.coprocessor.Batch; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Text; import org.junit.AfterClass; @@ -50,20 +53,26 @@ public class TestCoprocessorEndpoint { private static byte[] ROW = Bytes.toBytes("testRow"); private static final int ROWSIZE = 20; - private static final int rowSeperator1 = 5; - private static final int rowSeperator2 = 12; + private static final int rowSeparator1 = 5; + private static final int rowSeparator2 = 12; private static byte[][] ROWS = makeN(ROW, ROWSIZE); private static HBaseTestingUtility util = new HBaseTestingUtility(); private static MiniHBaseCluster cluster = null; + private static Class coprocessor1 = ColumnAggregationEndpoint.class; + private static Class coprocessor2 = GenericEndpoint.class; + private static Class masterCoprocessor = BaseMasterObserver.class; + @BeforeClass public static void setupBeforeClass() throws Exception { // set configure to indicate which cp should be loaded Configuration conf = util.getConfiguration(); conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, - "org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint", - "org.apache.hadoop.hbase.coprocessor.GenericEndpoint"); + coprocessor1.getName(), coprocessor2.getName()); + + conf.setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, + masterCoprocessor.getName()); util.startMiniCluster(2); cluster = util.getMiniHBaseCluster(); @@ -71,7 +80,7 @@ public class TestCoprocessorEndpoint { HTable table = util.createTable(TEST_TABLE, TEST_FAMILY); util.createMultiRegions(util.getConfiguration(), table, TEST_FAMILY, new byte[][] { HConstants.EMPTY_BYTE_ARRAY, - ROWS[rowSeperator1], ROWS[rowSeperator2] }); + ROWS[rowSeparator1], ROWS[rowSeparator2] }); for (int i = 0; i < ROWSIZE; i++) { Put put = new Put(ROWS[i]); @@ -126,7 +135,7 @@ public class TestCoprocessorEndpoint { // scan: for all regions results = table .coprocessorExec(ColumnAggregationProtocol.class, - ROWS[rowSeperator1 - 1], ROWS[rowSeperator2 + 1], + ROWS[rowSeparator1 - 1], ROWS[rowSeparator2 + 1], new Batch.Call() { public Long call(ColumnAggregationProtocol instance) throws IOException { @@ -148,7 +157,7 @@ public class TestCoprocessorEndpoint { // scan: for region 2 and region 3 results = table .coprocessorExec(ColumnAggregationProtocol.class, - ROWS[rowSeperator1 + 1], ROWS[rowSeperator2 + 1], + ROWS[rowSeparator1 + 1], ROWS[rowSeparator2 + 1], new Batch.Call() { public Long call(ColumnAggregationProtocol instance) throws IOException { @@ -160,12 +169,52 @@ public class TestCoprocessorEndpoint { for (Map.Entry e : results.entrySet()) { sumResult += e.getValue(); } - for (int i = rowSeperator1; i < ROWSIZE; i++) { + for (int i = rowSeparator1; i < ROWSIZE; i++) { expectedResult += i; } assertEquals("Invalid result", sumResult, expectedResult); } + @Test + public void testRegionServerCoprocessorReported() { + // HBASE 4070: Improve region server metrics to report loaded coprocessors + // to master: verify that each regionserver is reporting the correct set of + // loaded coprocessors. + // TODO: test display of regionserver-level coprocessors + // (e.g. SampleRegionWALObserver) versus region-level coprocessors + // (e.g. GenericEndpoint), and + // test CoprocessorHost.REGION_COPROCESSOR_CONF_KEY versus + // CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY. + // Enabling and disabling user tables to see if coprocessor display changes + // as coprocessors are loaded and enabled consequently. + + // Allow either ordering: since this is a set, either order is ok. + // Note the space [ ] after the comma in both constant strings: + // must be present for success of this test. + final String loadedCoprocessorsOrder1 = + "[" + coprocessor1.getSimpleName() + ", " + coprocessor2.getSimpleName() + "]"; + final String loadedCoprocessorsOrder2 = + "[" + coprocessor2.getSimpleName() + ", " + coprocessor1.getSimpleName() + "]"; + for(Map.Entry server : + util.getMiniHBaseCluster().getMaster().getServerManager().getOnlineServers().entrySet()) { + String regionServerCoprocessors = server.getValue().getLoadedCoprocessors(); + assertTrue(regionServerCoprocessors.equals(loadedCoprocessorsOrder1) || + regionServerCoprocessors.equals(loadedCoprocessorsOrder2)); + } + } + + @Test + public void testMasterCoprocessorsReported() { + // HBASE 4070: Improve region server metrics to report loaded coprocessors + // to master: verify that the master is reporting the correct set of + // loaded coprocessors. + final String loadedMasterCoprocessorsVerify = + "[" + masterCoprocessor.getSimpleName() + "]"; + String loadedMasterCoprocessors = + util.getMiniHBaseCluster().getMaster().generateCoprocessorString(); + assertEquals(loadedMasterCoprocessorsVerify, loadedMasterCoprocessors); + } + private static byte[][] makeN(byte[] base, int n) { byte[][] ret = new byte[n][]; for (int i = 0; i < n; i++) {