From 719acc0bac111daa067fe93285bc4eb9df03f5e2 Mon Sep 17 00:00:00 2001 From: mbautin Date: Fri, 9 Mar 2012 12:23:51 -0800 Subject: [PATCH] [jira] [HBASE-5292] Prevent counting getSize on compactions Author: Zhiqiu Kong Summary: Added two separate metrics for both get() and next(). This is done by refactoring on internal next() API. To be more specific, only Get.get() and ResultScanner.next() passes the metric name ("getsize" and "nextsize" repectively) to HRegion::RegionScanner::next(List, String) This will eventually hit StoreScanner()::next((List, int, String) where the metrics are counted. And their call paths are: 1) Get HTable::get(final Get get) => HRegionServer::get(byte [] regionName, Get get) => HRegion::get(final Get get, final Integer lockid) => HRegion::get(final Get get) [pass METRIC_GETSIZE to the callee] => HRegion::RegionScanner::next(List outResults, String metric) => HRegion::RegionScanner::next(List outResults, int limit, String metric) => HRegion::RegionScanner::nextInternal(int limit, String metric) => KeyValueHeap::next(List result, int limit, String metric) => StoreScanner::next(List outResult, int limit, String metric) 2) Next HTable::ClientScanner::next() => ScannerCallable::call() => HRegionServer::next(long scannerId) => HRegionServer::next(final long scannerId, int nbRows) [pass METRIC_NEXTSIZE to the callee] => HRegion::RegionScanner::next(List outResults, String metric) => HRegion::RegionScanner::next(List outResults, int limit, String metric) => HRegion::RegionScanner::nextInternal(int limit, String metric) => KeyValueHeap::next(List result, int limit, String metric) => StoreScanner::next(List outResult, int limit, String metric) Test Plan: 1. Passed unit tests. 2. Created a testcase TestRegionServerMetrics::testGetNextSize to guarantee: * Get/Next contributes to getsize/nextsize metrics * Both getsize/nextsize are per Column Family * Flush/compaction won't affect these two metrics Reviewed By: mbautin Reviewers: Kannan, mbautin, Liyin, JIRA CC: Kannan, mbautin, Liyin, zhiqiu Differential Revision: https://reviews.facebook.net/D1617 --- .../apache/hadoop/hbase/regionserver/HRegion.java | 35 ++++++-- .../hadoop/hbase/regionserver/HRegionServer.java | 2 +- .../hadoop/hbase/regionserver/InternalScanner.java | 20 +++++ .../hadoop/hbase/regionserver/KeyValueHeap.java | 23 +++++- .../hadoop/hbase/regionserver/StoreScanner.java | 39 ++++++++-- .../coprocessor/TestCoprocessorInterface.java | 12 +++ .../coprocessor/TestRegionObserverInterface.java | 17 ++++- .../regionserver/TestRegionServerMetrics.java | 83 +++++++++++++++++++- 8 files changed, 210 insertions(+), 21 deletions(-) diff --git src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 8208abf..859d3c2 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -33,6 +33,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -200,7 +201,7 @@ public class HRegion implements HeapSize { // , Writable{ // Registered region protocol handlers private ClassToInstanceMap protocolHandlers = MutableClassToInstanceMap.create(); - + private Map> protocolHandlerNames = Maps.newHashMap(); @@ -333,6 +334,9 @@ public class HRegion implements HeapSize { // , Writable{ // for simple numeric metrics (# of blocks read from block cache) public static final ConcurrentMap numericMetrics = new ConcurrentHashMap(); + public static final String METRIC_GETSIZE = "getsize"; + public static final String METRIC_NEXTSIZE = "nextsize"; + // for simple numeric metrics (current block cache size) // These ones are not reset to zero when queried, unlike the previous. public static final ConcurrentMap numericPersistentMetrics = new ConcurrentHashMap(); @@ -342,7 +346,7 @@ public class HRegion implements HeapSize { // , Writable{ * number of operations. */ public static final ConcurrentMap> - timeVaryingMetrics = new ConcurrentHashMap>(); public static void incrNumericMetric(String key, long amount) { @@ -958,7 +962,7 @@ public class HRegion implements HeapSize { // , Writable{ CompletionService> completionService = new ExecutorCompletionService>( storeCloserThreadPool); - + // close each store in parallel for (final Store store : stores.values()) { completionService @@ -2903,7 +2907,7 @@ public class HRegion implements HeapSize { // , Writable{ return currentEditSeqId; } finally { status.cleanup(); - if (reader != null) { + if (reader != null) { reader.close(); } } @@ -3345,6 +3349,12 @@ public class HRegion implements HeapSize { // , Writable{ @Override public synchronized boolean next(List outResults, int limit) throws IOException { + return next(outResults, limit, null); + } + + @Override + public synchronized boolean next(List outResults, int limit, + String metric) throws IOException { if (this.filterClosed) { throw new UnknownScannerException("Scanner was closed (timed out?) " + "after we renewed it. Could be caused by a very slow scanner " + @@ -3359,7 +3369,7 @@ public class HRegion implements HeapSize { // , Writable{ results.clear(); - boolean returnResult = nextInternal(limit); + boolean returnResult = nextInternal(limit, metric); outResults.addAll(results); resetFilters(); @@ -3376,7 +3386,14 @@ public class HRegion implements HeapSize { // , Writable{ public synchronized boolean next(List outResults) throws IOException { // apply the batching limit by default - return next(outResults, batch); + return next(outResults, batch, null); + } + + @Override + public synchronized boolean next(List outResults, String metric) + throws IOException { + // apply the batching limit by default + return next(outResults, batch, metric); } /* @@ -3386,7 +3403,7 @@ public class HRegion implements HeapSize { // , Writable{ return this.filter != null && this.filter.filterAllRemaining(); } - private boolean nextInternal(int limit) throws IOException { + private boolean nextInternal(int limit, String metric) throws IOException { while (true) { byte [] currentRow = peekRow(); if (isStopRow(currentRow)) { @@ -3403,7 +3420,7 @@ public class HRegion implements HeapSize { // , Writable{ } else { byte [] nextRow; do { - this.storeHeap.next(results, limit - results.size()); + this.storeHeap.next(results, limit - results.size(), metric); if (limit > 0 && results.size() == limit) { if (this.filter != null && filter.hasFilterRow()) { throw new IncompatibleFilterException( @@ -4160,7 +4177,7 @@ public class HRegion implements HeapSize { // , Writable{ RegionScanner scanner = null; try { scanner = getScanner(scan); - scanner.next(results); + scanner.next(results, HRegion.METRIC_GETSIZE); } finally { if (scanner != null) scanner.close(); diff --git src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 5094da5..a9a5610 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -2383,7 +2383,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, && currentScanResultSize < maxScannerResultSize; i++) { requestCount.incrementAndGet(); // Collect values to be returned here - boolean moreRows = s.next(values); + boolean moreRows = s.next(values, HRegion.METRIC_NEXTSIZE); if (!values.isEmpty()) { for (KeyValue kv : values) { currentScanResultSize += kv.heapSize(); diff --git src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java index 4e91743..162ee07 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java @@ -49,6 +49,15 @@ public interface InternalScanner extends Closeable { * @throws IOException e */ public boolean next(List results) throws IOException; + + /** + * Grab the next row's worth of values. + * @param results return output array + * @param metric the metric name + * @return true if more rows exist after this one, false if scanner is done + * @throws IOException e + */ + public boolean next(List results, String metric) throws IOException; /** * Grab the next row's worth of values with a limit on the number of values @@ -59,6 +68,17 @@ public interface InternalScanner extends Closeable { * @throws IOException e */ public boolean next(List result, int limit) throws IOException; + + /** + * Grab the next row's worth of values with a limit on the number of values + * to return. + * @param result return output array + * @param limit limit on row count to get + * @param metric the metric name + * @return true if more rows exist after this one, false if scanner is done + * @throws IOException e + */ + public boolean next(List result, int limit, String metric) throws IOException; /** * Closes the scanner and releases any resources it has allocated diff --git src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java index 0d47d14..59fc3db 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java @@ -122,11 +122,27 @@ public class KeyValueHeap extends NonLazyKeyValueScanner * @return true if there are more keys, false if all scanners are done */ public boolean next(List result, int limit) throws IOException { + return next(result, limit, null); + } + + /** + * Gets the next row of keys from the top-most scanner. + *

+ * This method takes care of updating the heap. + *

+ * This can ONLY be called when you are using Scanners that implement + * InternalScanner as well as KeyValueScanner (a {@link StoreScanner}). + * @param result output result list + * @param limit limit on row count to get + * @param metric the metric name + * @return true if there are more keys, false if all scanners are done + */ + public boolean next(List result, int limit, String metric) throws IOException { if (this.current == null) { return false; } InternalScanner currentAsInternal = (InternalScanner)this.current; - boolean mayContainMoreRows = currentAsInternal.next(result, limit); + boolean mayContainMoreRows = currentAsInternal.next(result, limit, metric); KeyValue pee = this.current.peek(); /* * By definition, any InternalScanner must return false only when it has no @@ -158,6 +174,11 @@ public class KeyValueHeap extends NonLazyKeyValueScanner return next(result, -1); } + @Override + public boolean next(List result, String metric) throws IOException { + return next(result, -1, metric); + } + private static class KVScannerComparator implements Comparator { private KVComparator kvComparator; /** diff --git src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 6a4a440..919d814 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -50,8 +50,8 @@ class StoreScanner extends NonLazyKeyValueScanner private KeyValueHeap heap; private boolean cacheBlocks; - private String metricNameGetSize; + private String metricNamePrefix; // Used to indicate that the scanner has closed (see HBASE-1107) // Doesnt need to be volatile because it's always accessed via synchronized methods private boolean closing = false; @@ -198,7 +198,7 @@ class StoreScanner extends NonLazyKeyValueScanner /** * Method used internally to initialize metric names throughout the * constructors. - * + * * To be called after the store variable has been initialized! */ private void initializeMetricNames() { @@ -208,8 +208,8 @@ class StoreScanner extends NonLazyKeyValueScanner tableName = store.getTableName(); family = Bytes.toString(store.getFamily().getName()); } - metricNameGetSize = SchemaMetrics.generateSchemaMetricsPrefix( - tableName, family) + "getsize"; + this.metricNamePrefix = + SchemaMetrics.generateSchemaMetricsPrefix(tableName, family); } /** @@ -308,6 +308,18 @@ class StoreScanner extends NonLazyKeyValueScanner */ @Override public synchronized boolean next(List outResult, int limit) throws IOException { + return next(outResult, limit, null); + } + + /** + * Get the next row of values from this Store. + * @param outResult + * @param limit + * @return true if there are more rows, false if scanner is done + */ + @Override + public synchronized boolean next(List outResult, int limit, + String metric) throws IOException { if (checkReseek()) { return true; @@ -355,7 +367,15 @@ class StoreScanner extends NonLazyKeyValueScanner case INCLUDE_AND_SEEK_NEXT_COL: Filter f = matcher.getFilter(); - results.add(f == null ? kv : f.transform(kv)); + if (f != null) { + kv = f.transform(kv); + } + results.add(kv); + + if (metric != null) { + HRegion.incrNumericMetric(this.metricNamePrefix + metric, + kv.getLength()); + } if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) { if (!matcher.moreRowsMayExistAfter(kv)) { @@ -369,7 +389,6 @@ class StoreScanner extends NonLazyKeyValueScanner this.heap.next(); } - HRegion.incrNumericMetric(metricNameGetSize, kv.getLength()); if (limit > 0 && (results.size() == limit)) { break LOOP; } @@ -434,7 +453,13 @@ class StoreScanner extends NonLazyKeyValueScanner @Override public synchronized boolean next(List outResult) throws IOException { - return next(outResult, -1); + return next(outResult, -1, null); + } + + @Override + public synchronized boolean next(List outResult, String metric) + throws IOException { + return next(outResult, -1, metric); } // Implementation of ChangedReadersObserver diff --git src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java index 545d107..f516f8c 100644 --- src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java +++ src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java @@ -63,11 +63,23 @@ public class TestCoprocessorInterface extends HBaseTestCase { public boolean next(List results) throws IOException { return delegate.next(results); } + + @Override + public boolean next(List results, String metric) + throws IOException { + return delegate.next(results, metric); + } @Override public boolean next(List result, int limit) throws IOException { return delegate.next(result, limit); } + + @Override + public boolean next(List result, int limit, String metric) + throws IOException { + return delegate.next(result, limit, metric); + } @Override public void close() throws IOException { diff --git src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java index 1b3b6df..60f21c0 100644 --- src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java +++ src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java @@ -299,13 +299,26 @@ public class TestRegionObserverInterface { public boolean next(List results) throws IOException { return next(results, -1); } + + @Override + public boolean next(List results, String metric) + throws IOException { + return next(results, -1, metric); + } + + @Override + public boolean next(List results, int limit) + throws IOException{ + return next(results, limit, null); + } @Override - public boolean next(List results, int limit) throws IOException { + public boolean next(List results, int limit, String metric) + throws IOException { List internalResults = new ArrayList(); boolean hasMore; do { - hasMore = scanner.next(internalResults, limit); + hasMore = scanner.next(internalResults, limit, metric); if (!internalResults.isEmpty()) { long row = Bytes.toLong(internalResults.get(0).getRow()); if (row % 2 == 0) { diff --git src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java index 6560672..d2fd2ff 100644 --- src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java @@ -27,7 +27,13 @@ import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics; import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics. StoreMetricType; @@ -57,7 +63,7 @@ public class TestRegionServerMetrics { private static final SchemaMetrics ALL_METRICS = SchemaMetrics.ALL_SCHEMA_METRICS; - private static final HBaseTestingUtility TEST_UTIL = + private final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private Map startingMetrics; @@ -131,5 +137,80 @@ public class TestRegionServerMetrics { @org.junit.Rule public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); + + private void assertSizeMetric(String table, String[] cfs, int[] metrics) { + // we have getsize & nextsize for each column family + assertEquals(cfs.length * 2, metrics.length); + + for (int i =0; i < cfs.length; ++i) { + String prefix = SchemaMetrics.generateSchemaMetricsPrefix(table, cfs[i]); + String getMetric = prefix + HRegion.METRIC_GETSIZE; + String nextMetric = prefix + HRegion.METRIC_NEXTSIZE; + + // verify getsize and nextsize matches + int getSize = HRegion.numericMetrics.containsKey(getMetric) ? + HRegion.numericMetrics.get(getMetric).intValue() : 0; + int nextSize = HRegion.numericMetrics.containsKey(nextMetric) ? + HRegion.numericMetrics.get(nextMetric).intValue() : 0; + + assertEquals(metrics[i], getSize); + assertEquals(metrics[cfs.length + i], nextSize); + } + } + + @Test + public void testGetNextSize() throws IOException, InterruptedException { + String rowName = "row1"; + byte[] ROW = Bytes.toBytes(rowName); + String tableName = "SizeMetricTest"; + byte[] TABLE = Bytes.toBytes(tableName); + String cf1Name = "cf1"; + String cf2Name = "cf2"; + String[] cfs = new String[] {cf1Name, cf2Name}; + byte[] CF1 = Bytes.toBytes(cf1Name); + byte[] CF2 = Bytes.toBytes(cf2Name); + + long ts = 1234; + HTable hTable = TEST_UTIL.createTable(TABLE, new byte[][]{CF1, CF2}); + HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); + + Put p = new Put(ROW); + p.add(CF1, CF1, ts, CF1); + p.add(CF2, CF2, ts, CF2); + hTable.put(p); + + KeyValue kv1 = new KeyValue(ROW, CF1, CF1, ts, CF1); + KeyValue kv2 = new KeyValue(ROW, CF2, CF2, ts, CF2); + int kvLength = kv1.getLength(); + assertEquals(kvLength, kv2.getLength()); + + // only cf1.getsize is set on Get + hTable.get(new Get(ROW).addFamily(CF1)); + assertSizeMetric(tableName, cfs, new int[] {kvLength, 0, 0, 0}); + + // only cf2.getsize is set on Get + hTable.get(new Get(ROW).addFamily(CF2)); + assertSizeMetric(tableName, cfs, new int[] {kvLength, kvLength, 0, 0}); + + // only cf2.nextsize is set + for (Result res : hTable.getScanner(CF2)) { + } + assertSizeMetric(tableName, cfs, + new int[] {kvLength, kvLength, 0, kvLength}); + + // only cf2.nextsize is set + for (Result res : hTable.getScanner(CF1)) { + } + assertSizeMetric(tableName, cfs, + new int[] {kvLength, kvLength, kvLength, kvLength}); + + // getsize/nextsize should not be set on flush or compaction + for (HRegion hr : TEST_UTIL.getMiniHBaseCluster().getRegions(TABLE)) { + hr.flushcache(); + hr.compactStores(); + } + assertSizeMetric(tableName, cfs, + new int[] {kvLength, kvLength, kvLength, kvLength}); + } } -- 1.7.4.4