From 2fe0064ff8007c561977e285e6d9d19eac0d6736 Mon Sep 17 00:00:00 2001 From: Zhiqiu Kong Date: Mon, 6 Feb 2012 21:43:17 -0800 Subject: [PATCH] [jira] [HBASE-5292] Prevent counting getSize on compactions Summary: Added two separate metrics for both get() and next(). This is done by refactoring on internal next() API. To be more specific, only Get.get() and ResultScanner.next() passes the metric name ("getsize" and "nextsize" repectively) to HRegion::RegionScanner::next(List, String) This will eventually hit StoreScanner()::next((List, int, String) where the metrics are counted. And their call paths are: 1) Get HTable::get(final Get get) => HRegionServer::get(byte [] regionName, Get get) => HRegion::get(final Get get, final Integer lockid) => HRegion::get(final Get get) [pass METRIC_GETSIZE to the callee] => HRegion::RegionScanner::next(List outResults, String metric) => HRegion::RegionScanner::next(List outResults, int limit, String metric) => HRegion::RegionScanner::nextInternal(int limit, String metric) => KeyValueHeap::next(List result, int limit, String metric) => StoreScanner::next(List outResult, int limit, String metric) 2) Next HTable::ClientScanner::next() => ScannerCallable::call() => HRegionServer::next(long scannerId) => HRegionServer::next(final long scannerId, int nbRows) [pass METRIC_NEXTSIZE to the callee] => HRegion::RegionScanner::next(List outResults, String metric) => HRegion::RegionScanner::next(List outResults, int limit, String metric) => HRegion::RegionScanner::nextInternal(int limit, String metric) => KeyValueHeap::next(List result, int limit, String metric) => StoreScanner::next(List outResult, int limit, String metric) Task ID: #898948 Blame Rev: Test Plan: 1. Passed unit tests. 2. Created a testcase TestRegionServerMetrics::testGetNextSize to guarantee: * Get/Next contributes to getsize/nextsize metrics * Both getsize/nextsize are per Column Family * Flush/compaction won't affect these two metrics Revert Plan: Tags: Reviewers: Kannan, mbautin, Liyin, JIRA CC: Kannan, mbautin, Liyin, zhiqiu Differential Revision: https://reviews.facebook.net/D1617 --- .../apache/hadoop/hbase/regionserver/HRegion.java | 35 ++++++-- .../hadoop/hbase/regionserver/HRegionServer.java | 2 +- .../hadoop/hbase/regionserver/InternalScanner.java | 20 +++++ .../hadoop/hbase/regionserver/KeyValueHeap.java | 23 +++++- .../hadoop/hbase/regionserver/StoreScanner.java | 39 ++++++++-- .../coprocessor/TestCoprocessorInterface.java | 12 +++ .../coprocessor/TestRegionObserverInterface.java | 17 ++++- .../regionserver/TestRegionServerMetrics.java | 83 +++++++++++++++++++- 8 files changed, 210 insertions(+), 21 deletions(-) diff --git src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 25cb31d..c219edc 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -33,6 +33,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -194,7 +195,7 @@ public class HRegion implements HeapSize { // , Writable{ // Registered region protocol handlers private ClassToInstanceMap protocolHandlers = MutableClassToInstanceMap.create(); - + private Map> protocolHandlerNames = Maps.newHashMap(); @@ -325,6 +326,9 @@ public class HRegion implements HeapSize { // , Writable{ // for simple numeric metrics (# of blocks read from block cache) public static final ConcurrentMap numericMetrics = new ConcurrentHashMap(); + public static final String METRIC_GETSIZE = "getsize"; + public static final String METRIC_NEXTSIZE = "nextsize"; + // for simple numeric metrics (current block cache size) // These ones are not reset to zero when queried, unlike the previous. public static final ConcurrentMap numericPersistentMetrics = new ConcurrentHashMap(); @@ -334,7 +338,7 @@ public class HRegion implements HeapSize { // , Writable{ * number of operations. */ public static final ConcurrentMap> - timeVaryingMetrics = new ConcurrentHashMap>(); public static void incrNumericMetric(String key, long amount) { @@ -931,7 +935,7 @@ public class HRegion implements HeapSize { // , Writable{ CompletionService> completionService = new ExecutorCompletionService>( storeCloserThreadPool); - + // close each store in parallel for (final Store store : stores.values()) { completionService @@ -2873,7 +2877,7 @@ public class HRegion implements HeapSize { // , Writable{ return currentEditSeqId; } finally { status.cleanup(); - if (reader != null) { + if (reader != null) { reader.close(); } } @@ -3315,6 +3319,12 @@ public class HRegion implements HeapSize { // , Writable{ @Override public synchronized boolean next(List outResults, int limit) throws IOException { + return next(outResults, limit, null); + } + + @Override + public synchronized boolean next(List outResults, int limit, + String metric) throws IOException { if (this.filterClosed) { throw new UnknownScannerException("Scanner was closed (timed out?) " + "after we renewed it. Could be caused by a very slow scanner " + @@ -3329,7 +3339,7 @@ public class HRegion implements HeapSize { // , Writable{ results.clear(); - boolean returnResult = nextInternal(limit); + boolean returnResult = nextInternal(limit, metric); outResults.addAll(results); resetFilters(); @@ -3346,7 +3356,14 @@ public class HRegion implements HeapSize { // , Writable{ public synchronized boolean next(List outResults) throws IOException { // apply the batching limit by default - return next(outResults, batch); + return next(outResults, batch, null); + } + + @Override + public synchronized boolean next(List outResults, String metric) + throws IOException { + // apply the batching limit by default + return next(outResults, batch, metric); } /* @@ -3356,7 +3373,7 @@ public class HRegion implements HeapSize { // , Writable{ return this.filter != null && this.filter.filterAllRemaining(); } - private boolean nextInternal(int limit) throws IOException { + private boolean nextInternal(int limit, String metric) throws IOException { while (true) { byte [] currentRow = peekRow(); if (isStopRow(currentRow)) { @@ -3373,7 +3390,7 @@ public class HRegion implements HeapSize { // , Writable{ } else { byte [] nextRow; do { - this.storeHeap.next(results, limit - results.size()); + this.storeHeap.next(results, limit - results.size(), metric); if (limit > 0 && results.size() == limit) { if (this.filter != null && filter.hasFilterRow()) { throw new IncompatibleFilterException( @@ -4122,7 +4139,7 @@ public class HRegion implements HeapSize { // , Writable{ RegionScanner scanner = null; try { scanner = getScanner(scan); - scanner.next(results); + scanner.next(results, HRegion.METRIC_GETSIZE); } finally { if (scanner != null) scanner.close(); diff --git src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 5cb606f..6a5e0d0 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -2349,7 +2349,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, && currentScanResultSize < maxScannerResultSize; i++) { requestCount.incrementAndGet(); // Collect values to be returned here - boolean moreRows = s.next(values); + boolean moreRows = s.next(values, HRegion.METRIC_NEXTSIZE); if (!values.isEmpty()) { for (KeyValue kv : values) { currentScanResultSize += kv.heapSize(); diff --git src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java index 0f5f36c..6cbed5a 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java @@ -47,6 +47,15 @@ public interface InternalScanner extends Closeable { * @throws IOException e */ public boolean next(List results) throws IOException; + + /** + * Grab the next row's worth of values. + * @param results return output array + * @param metric the metric name + * @return true if more rows exist after this one, false if scanner is done + * @throws IOException e + */ + public boolean next(List results, String metric) throws IOException; /** * Grab the next row's worth of values with a limit on the number of values @@ -57,6 +66,17 @@ public interface InternalScanner extends Closeable { * @throws IOException e */ public boolean next(List result, int limit) throws IOException; + + /** + * Grab the next row's worth of values with a limit on the number of values + * to return. + * @param result return output array + * @param limit limit on row count to get + * @param metric the metric name + * @return true if more rows exist after this one, false if scanner is done + * @throws IOException e + */ + public boolean next(List result, int limit, String metric) throws IOException; /** * Closes the scanner and releases any resources it has allocated diff --git src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java index 87883a0..f34e035 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java @@ -120,11 +120,27 @@ public class KeyValueHeap extends NonLazyKeyValueScanner * @return true if there are more keys, false if all scanners are done */ public boolean next(List result, int limit) throws IOException { + return next(result, limit, null); + } + + /** + * Gets the next row of keys from the top-most scanner. + *

+ * This method takes care of updating the heap. + *

+ * This can ONLY be called when you are using Scanners that implement + * InternalScanner as well as KeyValueScanner (a {@link StoreScanner}). + * @param result output result list + * @param limit limit on row count to get + * @param metric the metric name + * @return true if there are more keys, false if all scanners are done + */ + public boolean next(List result, int limit, String metric) throws IOException { if (this.current == null) { return false; } InternalScanner currentAsInternal = (InternalScanner)this.current; - boolean mayContainMoreRows = currentAsInternal.next(result, limit); + boolean mayContainMoreRows = currentAsInternal.next(result, limit, metric); KeyValue pee = this.current.peek(); /* * By definition, any InternalScanner must return false only when it has no @@ -156,6 +172,11 @@ public class KeyValueHeap extends NonLazyKeyValueScanner return next(result, -1); } + @Override + public boolean next(List result, String metric) throws IOException { + return next(result, -1, metric); + } + private static class KVScannerComparator implements Comparator { private KVComparator kvComparator; /** diff --git src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index f492f00..84bbeae 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -48,8 +48,8 @@ class StoreScanner extends NonLazyKeyValueScanner private KeyValueHeap heap; private boolean cacheBlocks; - private String metricNameGetSize; + private String metricNamePrefix; // Used to indicate that the scanner has closed (see HBASE-1107) // Doesnt need to be volatile because it's always accessed via synchronized methods private boolean closing = false; @@ -196,7 +196,7 @@ class StoreScanner extends NonLazyKeyValueScanner /** * Method used internally to initialize metric names throughout the * constructors. - * + * * To be called after the store variable has been initialized! */ private void initializeMetricNames() { @@ -206,8 +206,8 @@ class StoreScanner extends NonLazyKeyValueScanner tableName = store.getTableName(); family = Bytes.toString(store.getFamily().getName()); } - metricNameGetSize = SchemaMetrics.generateSchemaMetricsPrefix( - tableName, family) + "getsize"; + this.metricNamePrefix = + SchemaMetrics.generateSchemaMetricsPrefix(tableName, family); } /** @@ -306,6 +306,18 @@ class StoreScanner extends NonLazyKeyValueScanner */ @Override public synchronized boolean next(List outResult, int limit) throws IOException { + return next(outResult, limit, null); + } + + /** + * Get the next row of values from this Store. + * @param outResult + * @param limit + * @return true if there are more rows, false if scanner is done + */ + @Override + public synchronized boolean next(List outResult, int limit, + String metric) throws IOException { if (checkReseek()) { return true; @@ -353,7 +365,15 @@ class StoreScanner extends NonLazyKeyValueScanner case INCLUDE_AND_SEEK_NEXT_COL: Filter f = matcher.getFilter(); - results.add(f == null ? kv : f.transform(kv)); + if (f != null) { + kv = f.transform(kv); + } + results.add(kv); + + if (metric != null) { + HRegion.incrNumericMetric(this.metricNamePrefix + metric, + kv.getLength()); + } if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) { if (!matcher.moreRowsMayExistAfter(kv)) { @@ -367,7 +387,6 @@ class StoreScanner extends NonLazyKeyValueScanner this.heap.next(); } - HRegion.incrNumericMetric(metricNameGetSize, kv.getLength()); if (limit > 0 && (results.size() == limit)) { break LOOP; } @@ -432,7 +451,13 @@ class StoreScanner extends NonLazyKeyValueScanner @Override public synchronized boolean next(List outResult) throws IOException { - return next(outResult, -1); + return next(outResult, -1, null); + } + + @Override + public synchronized boolean next(List outResult, String metric) + throws IOException { + return next(outResult, -1, metric); } // Implementation of ChangedReadersObserver diff --git src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java index 545d107..f516f8c 100644 --- src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java +++ src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java @@ -63,11 +63,23 @@ public class TestCoprocessorInterface extends HBaseTestCase { public boolean next(List results) throws IOException { return delegate.next(results); } + + @Override + public boolean next(List results, String metric) + throws IOException { + return delegate.next(results, metric); + } @Override public boolean next(List result, int limit) throws IOException { return delegate.next(result, limit); } + + @Override + public boolean next(List result, int limit, String metric) + throws IOException { + return delegate.next(result, limit, metric); + } @Override public void close() throws IOException { diff --git src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java index 88ad220..0f788e2 100644 --- src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java +++ src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java @@ -299,13 +299,26 @@ public class TestRegionObserverInterface { public boolean next(List results) throws IOException { return next(results, -1); } + + @Override + public boolean next(List results, String metric) + throws IOException { + return next(results, -1, metric); + } + + @Override + public boolean next(List results, int limit) + throws IOException{ + return next(results, limit, null); + } @Override - public boolean next(List results, int limit) throws IOException { + public boolean next(List results, int limit, String metric) + throws IOException { List internalResults = new ArrayList(); boolean hasMore; do { - hasMore = scanner.next(internalResults, limit); + hasMore = scanner.next(internalResults, limit, metric); if (!internalResults.isEmpty()) { long row = Bytes.toLong(internalResults.get(0).getRow()); if (row % 2 == 0) { diff --git src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java index 6560672..d2fd2ff 100644 --- src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java @@ -27,7 +27,13 @@ import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics; import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics. StoreMetricType; @@ -57,7 +63,7 @@ public class TestRegionServerMetrics { private static final SchemaMetrics ALL_METRICS = SchemaMetrics.ALL_SCHEMA_METRICS; - private static final HBaseTestingUtility TEST_UTIL = + private final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private Map startingMetrics; @@ -131,5 +137,80 @@ public class TestRegionServerMetrics { @org.junit.Rule public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); + + private void assertSizeMetric(String table, String[] cfs, int[] metrics) { + // we have getsize & nextsize for each column family + assertEquals(cfs.length * 2, metrics.length); + + for (int i =0; i < cfs.length; ++i) { + String prefix = SchemaMetrics.generateSchemaMetricsPrefix(table, cfs[i]); + String getMetric = prefix + HRegion.METRIC_GETSIZE; + String nextMetric = prefix + HRegion.METRIC_NEXTSIZE; + + // verify getsize and nextsize matches + int getSize = HRegion.numericMetrics.containsKey(getMetric) ? + HRegion.numericMetrics.get(getMetric).intValue() : 0; + int nextSize = HRegion.numericMetrics.containsKey(nextMetric) ? + HRegion.numericMetrics.get(nextMetric).intValue() : 0; + + assertEquals(metrics[i], getSize); + assertEquals(metrics[cfs.length + i], nextSize); + } + } + + @Test + public void testGetNextSize() throws IOException, InterruptedException { + String rowName = "row1"; + byte[] ROW = Bytes.toBytes(rowName); + String tableName = "SizeMetricTest"; + byte[] TABLE = Bytes.toBytes(tableName); + String cf1Name = "cf1"; + String cf2Name = "cf2"; + String[] cfs = new String[] {cf1Name, cf2Name}; + byte[] CF1 = Bytes.toBytes(cf1Name); + byte[] CF2 = Bytes.toBytes(cf2Name); + + long ts = 1234; + HTable hTable = TEST_UTIL.createTable(TABLE, new byte[][]{CF1, CF2}); + HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); + + Put p = new Put(ROW); + p.add(CF1, CF1, ts, CF1); + p.add(CF2, CF2, ts, CF2); + hTable.put(p); + + KeyValue kv1 = new KeyValue(ROW, CF1, CF1, ts, CF1); + KeyValue kv2 = new KeyValue(ROW, CF2, CF2, ts, CF2); + int kvLength = kv1.getLength(); + assertEquals(kvLength, kv2.getLength()); + + // only cf1.getsize is set on Get + hTable.get(new Get(ROW).addFamily(CF1)); + assertSizeMetric(tableName, cfs, new int[] {kvLength, 0, 0, 0}); + + // only cf2.getsize is set on Get + hTable.get(new Get(ROW).addFamily(CF2)); + assertSizeMetric(tableName, cfs, new int[] {kvLength, kvLength, 0, 0}); + + // only cf2.nextsize is set + for (Result res : hTable.getScanner(CF2)) { + } + assertSizeMetric(tableName, cfs, + new int[] {kvLength, kvLength, 0, kvLength}); + + // only cf2.nextsize is set + for (Result res : hTable.getScanner(CF1)) { + } + assertSizeMetric(tableName, cfs, + new int[] {kvLength, kvLength, kvLength, kvLength}); + + // getsize/nextsize should not be set on flush or compaction + for (HRegion hr : TEST_UTIL.getMiniHBaseCluster().getRegions(TABLE)) { + hr.flushcache(); + hr.compactStores(); + } + assertSizeMetric(tableName, cfs, + new int[] {kvLength, kvLength, kvLength, kvLength}); + } } -- 1.7.8.4