diff --git src/main/jamon/org/apache/hbase/tmpl/master/MasterStatusTmpl.jamon src/main/jamon/org/apache/hbase/tmpl/master/MasterStatusTmpl.jamon index abeb850..d5be799 100644 --- src/main/jamon/org/apache/hbase/tmpl/master/MasterStatusTmpl.jamon +++ src/main/jamon/org/apache/hbase/tmpl/master/MasterStatusTmpl.jamon @@ -99,6 +99,7 @@ org.apache.hadoop.hbase.HTableDescriptor; Fragmentation<% frags.get("-TOTAL-") != null ? frags.get("-TOTAL-").intValue() + "%" : "n/a" %>Overall fragmentation of all tables, including .META. and -ROOT-. Zookeeper Quorum<% master.getZooKeeperWatcher().getQuorum() %>Addresses of all registered ZK servers. For more, see zk dump. +Coprocessors<% java.util.Arrays.toString(master.getCoprocessorNames()) %>Coprocessors currently loaded loaded by the master <& ../common/TaskMonitorTmpl; filter = filter &> diff --git src/main/jamon/org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon src/main/jamon/org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon index be6fceb..be5829d 100644 --- src/main/jamon/org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon +++ src/main/jamon/org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon @@ -77,6 +77,7 @@ org.apache.hadoop.hbase.HRegionInfo; HBase Compiled<% org.apache.hadoop.hbase.util.VersionInfo.getDate() %>, <% org.apache.hadoop.hbase.util.VersionInfo.getUser() %>When HBase version was compiled and by whom Metrics<% metrics.toString() %>RegionServer Metrics; file and heap sizes are in megabytes Zookeeper Quorum<% regionServer.getZooKeeper().getQuorum() %>Addresses of all registered ZK servers +Coprocessors<% java.util.Arrays.toString(regionServer.getCoprocessorNames()) %>Coprocessors currently loaded by this regionserver <& ../common/TaskMonitorTmpl; filter = filter &> diff --git src/main/java/org/apache/hadoop/hbase/ClusterStatus.java src/main/java/org/apache/hadoop/hbase/ClusterStatus.java index 01bc1dd..8d8b715 100644 --- src/main/java/org/apache/hadoop/hbase/ClusterStatus.java +++ src/main/java/org/apache/hadoop/hbase/ClusterStatus.java @@ -66,6 +66,7 @@ public class ClusterStatus extends VersionedWritable { private Collection deadServers; private Map intransition; private String clusterId; + private String[] masterCoprocessorNames; /** * Constructor, for Writable @@ -76,12 +77,14 @@ public class ClusterStatus extends VersionedWritable { public ClusterStatus(final String hbaseVersion, final String clusterid, final Map servers, - final Collection deadServers, final Map rit) { + final Collection deadServers, final Map rit, + final String[] masterCoprocessorNames) { this.hbaseVersion = hbaseVersion; this.liveServers = servers; this.deadServers = deadServers; this.intransition = rit; this.clusterId = clusterid; + this.masterCoprocessorNames = masterCoprocessorNames; } /** @@ -155,7 +158,8 @@ public class ClusterStatus extends VersionedWritable { return (getVersion() == ((ClusterStatus)o).getVersion()) && getHBaseVersion().equals(((ClusterStatus)o).getHBaseVersion()) && this.liveServers.equals(((ClusterStatus)o).liveServers) && - deadServers.equals(((ClusterStatus)o).deadServers); + deadServers.equals(((ClusterStatus)o).deadServers) && + this.masterCoprocessorNames.equals(((ClusterStatus)o).masterCoprocessorNames); } /** @@ -205,6 +209,10 @@ public class ClusterStatus extends VersionedWritable { return clusterId; } + public String[] getMasterCoprocessorNames() { + return masterCoprocessorNames; + } + // // Writable // @@ -227,6 +235,10 @@ public class ClusterStatus extends VersionedWritable { e.getValue().write(out); } out.writeUTF(clusterId); + out.writeInt(masterCoprocessorNames.length); + for(String masterCoprocessor: masterCoprocessorNames) { + out.writeUTF(masterCoprocessor); + } } public void readFields(DataInput in) throws IOException { @@ -254,5 +266,11 @@ public class ClusterStatus extends VersionedWritable { this.intransition.put(key, regionState); } this.clusterId = in.readUTF(); + int masterCoprocessorNamesLength = in.readInt(); + masterCoprocessorNames = new String[masterCoprocessorNamesLength]; + for(int i = 0; i < masterCoprocessorNamesLength; i++) { + masterCoprocessorNames[i] = in.readUTF(); + } } -} \ No newline at end of file +} + diff --git src/main/java/org/apache/hadoop/hbase/HServerLoad.java src/main/java/org/apache/hadoop/hbase/HServerLoad.java index 0c680e4..da68347 100644 --- src/main/java/org/apache/hadoop/hbase/HServerLoad.java +++ src/main/java/org/apache/hadoop/hbase/HServerLoad.java @@ -22,9 +22,13 @@ package org.apache.hadoop.hbase; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.Map; +import java.util.Set; import java.util.TreeMap; +import java.util.TreeSet; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Strings; @@ -55,6 +59,35 @@ implements WritableComparable { /** the maximum allowable size of the heap, in MB */ private int maxHeapMB = 0; + // region server level coprocessors, e.g., WALObserver implementations. + // region-level coprocessors are stored inside RegionLoad objects. + private Set coprocessors = null; + + static Comparator stringComparator = + new Comparator() { + @Override + public int compare(String o1, String o2) { + return o1.compareTo(o2); + } + }; + + Set coprocessorNames = + new TreeSet(stringComparator); + + public String[] getCoprocessorNames() { + if (coprocessors != null) { + for (CoprocessorEnvironment environment: coprocessors) { + coprocessorNames.add(environment.getInstance().getClass().getSimpleName()); + } + } + for (Map.Entry rls: getRegionsLoad().entrySet()) { + for (String coprocessorName: rls.getValue().getCoprocessorNames()) { + coprocessorNames.add(coprocessorName); + } + } + return coprocessorNames.toArray(new String[0]); + } + /** per-region load metrics */ private Map regionLoad = new TreeMap(Bytes.BYTES_COMPARATOR); @@ -114,6 +147,9 @@ implements WritableComparable { */ private int totalStaticBloomSizeKB; + private Set coprocessors; + private String[] coprocessorNames = new String[0]; + /** * Constructor, for Writable */ @@ -133,6 +169,7 @@ implements WritableComparable { * @param writeRequestsCount * @param totalCompactingKVs * @param currentCompactedKVs + * @param coprocessors */ public RegionLoad(final byte[] name, final int stores, final int storefiles, final int storeUncompressedSizeMB, @@ -141,7 +178,8 @@ implements WritableComparable { final int rootIndexSizeKB, final int totalStaticIndexSizeKB, final int totalStaticBloomSizeKB, final int readRequestsCount, final int writeRequestsCount, - final long totalCompactingKVs, final long currentCompactedKVs) { + final long totalCompactingKVs, final long currentCompactedKVs, + final Set coprocessors) { this.name = name; this.stores = stores; this.storefiles = storefiles; @@ -156,9 +194,23 @@ implements WritableComparable { this.writeRequestsCount = writeRequestsCount; this.totalCompactingKVs = totalCompactingKVs; this.currentCompactedKVs = currentCompactedKVs; + this.coprocessors = coprocessors; } // Getters + private String[] getCoprocessorNames() { + if (coprocessors != null) { + ArrayList coprocessorStrings = new ArrayList(); + for (CoprocessorEnvironment environment: coprocessors) { + coprocessorStrings.add(environment.getInstance().getClass().getSimpleName()); + } + coprocessorNames = coprocessorStrings.toArray(new String[0]); + return coprocessorNames; + } + else { + return coprocessorNames; + } + } /** * @return the region name @@ -332,6 +384,11 @@ implements WritableComparable { this.totalStaticBloomSizeKB = in.readInt(); this.totalCompactingKVs = in.readLong(); this.currentCompactedKVs = in.readLong(); + int coprocessorsSize = in.readInt(); + coprocessorNames = new String[coprocessorsSize]; + for (int i = 0; i < coprocessorsSize; i++) { + coprocessorNames[i] = in.readUTF(); + } } public void write(DataOutput out) throws IOException { @@ -352,6 +409,21 @@ implements WritableComparable { out.writeInt(totalStaticBloomSizeKB); out.writeLong(totalCompactingKVs); out.writeLong(currentCompactedKVs); + if (coprocessors != null) { + out.writeInt(coprocessors.size()); + for (CoprocessorEnvironment env: coprocessors) { + out.writeUTF(env.getInstance().getClass().getSimpleName()); + } + } else { + if (coprocessorNames != null) { + out.writeInt(coprocessorNames.length); + for (String coprocessorName: coprocessorNames) { + out.writeUTF(coprocessorName); + } + } else { + out.writeInt(0); + } + } } /** @@ -397,6 +469,7 @@ implements WritableComparable { } sb = Strings.appendKeyValue(sb, "compactionProgressPct", compactionProgressPct); + sb = Strings.appendKeyValue(sb, "coprocessors", java.util.Arrays.toString(getCoprocessorNames())); return sb.toString(); } } @@ -424,15 +497,18 @@ implements WritableComparable { * @param numberOfRequests * @param usedHeapMB * @param maxHeapMB + * @param coprocessors : coprocessors loaded at the regionserver-level */ public HServerLoad(final int totalNumberOfRequests, final int numberOfRequests, final int usedHeapMB, final int maxHeapMB, - final Map regionLoad) { + final Map regionLoad, + final Set coprocessors) { this.numberOfRequests = numberOfRequests; this.usedHeapMB = usedHeapMB; this.maxHeapMB = maxHeapMB; this.regionLoad = regionLoad; this.totalNumberOfRequests = totalNumberOfRequests; + this.coprocessors = coprocessors; } /** @@ -441,7 +517,7 @@ implements WritableComparable { */ public HServerLoad(final HServerLoad hsl) { this(hsl.totalNumberOfRequests, hsl.numberOfRequests, hsl.usedHeapMB, - hsl.maxHeapMB, hsl.getRegionsLoad()); + hsl.maxHeapMB, hsl.getRegionsLoad(), hsl.coprocessors); for (Map.Entry e : hsl.regionLoad.entrySet()) { this.regionLoad.put(e.getKey(), e.getValue()); } @@ -487,6 +563,7 @@ implements WritableComparable { sb = Strings.appendKeyValue(sb, "usedHeapMB", Integer.valueOf(this.usedHeapMB)); sb = Strings.appendKeyValue(sb, "maxHeapMB", Integer.valueOf(maxHeapMB)); + sb = Strings.appendKeyValue(sb, "coprocessors", java.util.Arrays.toString(getCoprocessorNames())); return sb.toString(); } @@ -607,6 +684,10 @@ implements WritableComparable { regionLoad.put(rl.getName(), rl); } totalNumberOfRequests = in.readInt(); + int coprocessorsSize = in.readInt(); + for(int i = 0; i < coprocessorsSize; i++) { + coprocessorNames.add(in.readUTF()); + } } public void write(DataOutput out) throws IOException { @@ -619,6 +700,11 @@ implements WritableComparable { for (RegionLoad rl: regionLoad.values()) rl.write(out); out.writeInt(totalNumberOfRequests); + String[] loadedCoprocessors = getCoprocessorNames(); + out.writeInt(loadedCoprocessors.length); + for (String coprocessorName: loadedCoprocessors) { + out.writeUTF(coprocessorName); + } } // Comparable diff --git src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index a55a4b1..ea26112 100644 --- src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -1649,4 +1649,13 @@ public class HBaseAdmin implements Abortable, Closeable { sn.getHostname(), sn.getPort()); return rs.rollHLogWriter(); } + + public String[] getMasterCoprocessorNames() { + try { + return getClusterStatus().getMasterCoprocessorNames(); + } catch (IOException e) { + LOG.error("Could not getClusterStatus()",e); + return null; + } + } } diff --git src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java index dbae4fd..f3743a8 100644 --- src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java +++ src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java @@ -73,6 +73,14 @@ public abstract class CoprocessorHost { pathPrefix = UUID.randomUUID().toString(); } + /** + * Not to be confused with the per-object _coprocessors_ (above), + * coprocessorNames is static and stores the set of all coprocessors ever + * loaded by any thread in this JVM. It is strictly additive: coprocessors are + * added to coprocessorNames, by loadInstance() but are never removed, since + * the intention is to preserve a history of all loaded coprocessors for + * diagnosis in case of server crash (HBASE-4014). + */ private static Set coprocessorNames = Collections.synchronizedSet(new HashSet()); public static Set getLoadedCoprocessors() { @@ -80,6 +88,16 @@ public abstract class CoprocessorHost { } /** + * Used by @see{HRegionServer#buildServerLoad()} and + * @see{HRegionServer#createRegionLoad()} + * to report loaded coprocessors. + * @return set of coprocessor environments. + */ + public SortedSet getCoprocessors() { + return coprocessors; + } + + /** * Load system coprocessors. Read the class names from configuration. * Called by constructor. */ diff --git src/main/java/org/apache/hadoop/hbase/master/HMaster.java src/main/java/org/apache/hadoop/hbase/master/HMaster.java index f80d232..6747158 100644 --- src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -29,6 +29,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -352,7 +353,7 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { } /** - * Initilize all ZK based system trackers. + * Initialize all ZK based system trackers. * @throws IOException * @throws InterruptedException */ @@ -1183,7 +1184,8 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { this.fileSystemManager.getClusterId(), this.serverManager.getOnlineServers(), this.serverManager.getDeadServers(), - this.assignmentManager.getRegionsInTransition()); + this.assignmentManager.getRegionsInTransition(), + this.getCoprocessorNames()); } public String getClusterId() { @@ -1201,6 +1203,20 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { return CoprocessorHost.getLoadedCoprocessors().toString(); } + /** + * @return array of coprocessor SimpleNames. + */ + public String[] getCoprocessorNames() { + Set masterCoprocessors = + getCoprocessorHost().getCoprocessors(); + String[] returnValue = new String[masterCoprocessors.size()]; + int i = 0; + for (MasterCoprocessorHost.MasterEnvironment e: masterCoprocessors) { + returnValue[i++] = e.getInstance().getClass().getSimpleName(); + } + return returnValue; + } + @Override public void abort(final String msg, final Throwable t) { if (cpHost != null) { diff --git src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 96b763b..b78492e 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -788,7 +788,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, ManagementFactory.getMemoryMXBean().getHeapMemoryUsage(); return new HServerLoad(requestCount.get(),(int)metrics.getRequests(), (int)(memory.getUsed() / 1024 / 1024), - (int) (memory.getMax() / 1024 / 1024), regionLoads); + (int) (memory.getMax() / 1024 / 1024), regionLoads, + this.hlog.getCoprocessorHost().getCoprocessors()); } String getOnlineRegionsAsPrintableString() { @@ -987,7 +988,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, storefileSizeMB, memstoreSizeMB, storefileIndexSizeMB, rootIndexSizeKB, totalStaticIndexSizeKB, totalStaticBloomSizeKB, (int) r.readRequestsCount.get(), (int) r.writeRequestsCount.get(), - totalCompactingKVs, currentCompactedKVs); + totalCompactingKVs, currentCompactedKVs, + r.getCoprocessorHost().getCoprocessors()); } /** @@ -3238,4 +3240,14 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, HLog wal = this.getWAL(); return wal.rollWriter(true); } + + // used by org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon (HBASE-4070). + public String[] getCoprocessorNames() { + HServerLoad hsl = buildServerLoad(); + if (hsl != null) { + return hsl.getCoprocessorNames(); + } else { + return null; + } + } } diff --git src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorReporting.java src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorReporting.java new file mode 100644 index 0000000..42c9471 --- /dev/null +++ src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorReporting.java @@ -0,0 +1,153 @@ +/* + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.coprocessor; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HServerLoad; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.coprocessor.Batch; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.Text; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * TestCoprocessorReporting: test cases to verify that loaded coprocessors on + * master and regionservers are reported correctly. + */ +public class TestCoprocessorReporting { + + private static final byte[] TEST_TABLE = Bytes.toBytes("TestTable"); + private static final byte[] TEST_FAMILY = Bytes.toBytes("TestFamily"); + private static final byte[] TEST_QUALIFIER = Bytes.toBytes("TestQualifier"); + private static byte[] ROW = Bytes.toBytes("testRow"); + + private static final int ROWSIZE = 20; + private static final int rowSeparator1 = 5; + private static final int rowSeparator2 = 12; + private static byte[][] ROWS = makeN(ROW, ROWSIZE); + + private static HBaseTestingUtility util = new HBaseTestingUtility(); + private static MiniHBaseCluster cluster = null; + + private static Class regionCoprocessor1 = ColumnAggregationEndpoint.class; + private static Class regionCoprocessor2 = GenericEndpoint.class; + private static Class regionServerCoprocessor = SampleRegionWALObserver.class; + private static Class masterCoprocessor = BaseMasterObserver.class; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + // set configure to indicate which cp should be loaded + Configuration conf = util.getConfiguration(); + conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + regionCoprocessor1.getName(), regionCoprocessor2.getName()); + conf.setStrings(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY, + regionServerCoprocessor.getName()); + + conf.setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, + masterCoprocessor.getName()); + + util.startMiniCluster(2); + cluster = util.getMiniHBaseCluster(); + + HTable table = util.createTable(TEST_TABLE, TEST_FAMILY); + util.createMultiRegions(util.getConfiguration(), table, TEST_FAMILY, + new byte[][] { HConstants.EMPTY_BYTE_ARRAY, + ROWS[rowSeparator1], ROWS[rowSeparator2] }); + + for (int i = 0; i < ROWSIZE; i++) { + Put put = new Put(ROWS[i]); + put.add(TEST_FAMILY, TEST_QUALIFIER, Bytes.toBytes(i)); + table.put(put); + } + + // sleep here is an ugly hack to allow region transitions to finish + Thread.sleep(5000); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + util.shutdownMiniCluster(); + } + + @Test + public void testRegionServerCoprocessorsReported() { + // HBASE 4070: Improve region server metrics to report loaded coprocessors + // to master: verify that each regionserver is reporting the correct set of + // loaded coprocessors. + // TODO: test display of regionserver-level coprocessors + // (e.g. SampleRegionWALObserver) versus region-level coprocessors + // (e.g. GenericEndpoint), and + // test CoprocessorHost.REGION_COPROCESSOR_CONF_KEY versus + // CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY. + // Test enabling and disabling user tables to see if coprocessor display + // changes as coprocessors are consequently loaded and unloaded. + + // We rely on the fact that getLoadedCoprocessors() will return a sorted + // display of the coprocessors' names, so coprocessor1's name + // "ColumnAggregationEndpoint" will appear before coprocessor2's name + // "GenericEndpoint" because "C" is before "G" lexicographically. + // Note the space [ ] after the comma in both constant strings: + // must be present for success of this test. + final String loadedCoprocessorsExpected = + "[" + regionCoprocessor1.getSimpleName() + ", " + + regionCoprocessor2.getSimpleName() + ", " + + regionServerCoprocessor.getSimpleName() + "]"; + for(Map.Entry server : + util.getMiniHBaseCluster().getMaster().getServerManager().getOnlineServers().entrySet()) { + String regionServerCoprocessors = + java.util.Arrays.toString(server.getValue().getCoprocessorNames()); + assertTrue(regionServerCoprocessors.equals(loadedCoprocessorsExpected)); + } + } + + @Test + public void testMasterCoprocessorsReported() { + // HBASE 4070: Improve region server metrics to report loaded coprocessors + // to master: verify that the master is reporting the correct set of + // loaded coprocessors. + final String loadedMasterCoprocessorsVerify = + "[" + masterCoprocessor.getSimpleName() + "]"; + String loadedMasterCoprocessors = + java.util.Arrays.toString(util.getHBaseCluster().getMaster().getCoprocessorNames()); + assertEquals(loadedMasterCoprocessorsVerify, loadedMasterCoprocessors); + } + + private static byte[][] makeN(byte[] base, int n) { + byte[][] ret = new byte[n][]; + for (int i = 0; i < n; i++) { + ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%02d", i))); + } + return ret; + } +}