Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java (revision 1377139) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java (working copy) @@ -29,6 +29,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.Append; @@ -37,6 +38,7 @@ import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.regionserver.metrics.RegionMetricsStorage; import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics; import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics. @@ -69,7 +71,7 @@ private static final SchemaMetrics ALL_METRICS = SchemaMetrics.ALL_SCHEMA_METRICS; - private static final HBaseTestingUtility TEST_UTIL = + private final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private Map startingMetrics; @@ -270,5 +272,80 @@ @org.junit.Rule public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); + + private void assertSizeMetric(String table, String[] cfs, int[] metrics) { + // we have getsize & nextsize for each column family + assertEquals(cfs.length * 2, metrics.length); + + for (int i =0; i < cfs.length; ++i) { + String prefix = SchemaMetrics.generateSchemaMetricsPrefix(table, cfs[i]); + String getMetric = prefix + HRegion.METRIC_GETSIZE; + String nextMetric = prefix + HRegion.METRIC_NEXTSIZE; + + // verify getsize and nextsize matches + int getSize = RegionMetricsStorage.getNumericMetrics().containsKey(getMetric) ? + RegionMetricsStorage.getNumericMetrics().get(getMetric).intValue() : 0; + int nextSize = RegionMetricsStorage.getNumericMetrics().containsKey(nextMetric) ? + RegionMetricsStorage.getNumericMetrics().get(nextMetric).intValue() : 0; + + assertEquals(metrics[i], getSize); + assertEquals(metrics[cfs.length + i], nextSize); + } + } + + @Test + public void testGetNextSize() throws IOException, InterruptedException { + String rowName = "row1"; + byte[] ROW = Bytes.toBytes(rowName); + String tableName = "SizeMetricTest"; + byte[] TABLE = Bytes.toBytes(tableName); + String cf1Name = "cf1"; + String cf2Name = "cf2"; + String[] cfs = new String[] {cf1Name, cf2Name}; + byte[] CF1 = Bytes.toBytes(cf1Name); + byte[] CF2 = Bytes.toBytes(cf2Name); + + long ts = 1234; + HTable hTable = TEST_UTIL.createTable(TABLE, new byte[][]{CF1, CF2}); + HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); + + Put p = new Put(ROW); + p.add(CF1, CF1, ts, CF1); + p.add(CF2, CF2, ts, CF2); + hTable.put(p); + + KeyValue kv1 = new KeyValue(ROW, CF1, CF1, ts, CF1); + KeyValue kv2 = new KeyValue(ROW, CF2, CF2, ts, CF2); + int kvLength = kv1.getLength(); + assertEquals(kvLength, kv2.getLength()); + + // only cf1.getsize is set on Get + hTable.get(new Get(ROW).addFamily(CF1)); + assertSizeMetric(tableName, cfs, new int[] {kvLength, 0, 0, 0}); + + // only cf2.getsize is set on Get + hTable.get(new Get(ROW).addFamily(CF2)); + assertSizeMetric(tableName, cfs, new int[] {kvLength, kvLength, 0, 0}); + + // only cf2.nextsize is set + for (Result res : hTable.getScanner(CF2)) { + } + assertSizeMetric(tableName, cfs, + new int[] {kvLength, kvLength, 0, kvLength}); + + // only cf2.nextsize is set + for (Result res : hTable.getScanner(CF1)) { + } + assertSizeMetric(tableName, cfs, + new int[] {kvLength, kvLength, kvLength, kvLength}); + + // getsize/nextsize should not be set on flush or compaction + for (HRegion hr : TEST_UTIL.getMiniHBaseCluster().getRegions(TABLE)) { + hr.flushcache(); + hr.compactStores(); + } + assertSizeMetric(tableName, cfs, + new int[] {kvLength, kvLength, kvLength, kvLength}); + } } Index: src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java (revision 1377139) +++ src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java (working copy) @@ -304,13 +304,26 @@ public boolean next(List results) throws IOException { return next(results, -1); } + + @Override + public boolean next(List results, String metric) + throws IOException { + return next(results, -1, metric); + } @Override - public boolean next(List results, int limit) throws IOException { + public boolean next(List results, int limit) + throws IOException{ + return next(results, limit, null); + } + + @Override + public boolean next(List results, int limit, String metric) + throws IOException { List internalResults = new ArrayList(); boolean hasMore; do { - hasMore = scanner.next(internalResults, limit); + hasMore = scanner.next(internalResults, limit, metric); if (!internalResults.isEmpty()) { long row = Bytes.toLong(internalResults.get(0).getRow()); if (row % 2 == 0) { Index: src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java (revision 1377139) +++ src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java (working copy) @@ -66,11 +66,23 @@ public boolean next(List results) throws IOException { return delegate.next(results); } + + @Override + public boolean next(List results, String metric) + throws IOException { + return delegate.next(results, metric); + } @Override public boolean next(List result, int limit) throws IOException { return delegate.next(result, limit); } + + @Override + public boolean next(List result, int limit, String metric) + throws IOException { + return delegate.next(result, limit, metric); + } @Override public void close() throws IOException { Index: src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java (revision 1377139) +++ src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java (working copy) @@ -120,11 +120,27 @@ * @return true if there are more keys, false if all scanners are done */ public boolean next(List result, int limit) throws IOException { + return next(result, limit, null); + } + + /** + * Gets the next row of keys from the top-most scanner. + *

+ * This method takes care of updating the heap. + *

+ * This can ONLY be called when you are using Scanners that implement + * InternalScanner as well as KeyValueScanner (a {@link StoreScanner}). + * @param result output result list + * @param limit limit on row count to get + * @param metric the metric name + * @return true if there are more keys, false if all scanners are done + */ + public boolean next(List result, int limit, String metric) throws IOException { if (this.current == null) { return false; } InternalScanner currentAsInternal = (InternalScanner)this.current; - boolean mayContainMoreRows = currentAsInternal.next(result, limit); + boolean mayContainMoreRows = currentAsInternal.next(result, limit, metric); KeyValue pee = this.current.peek(); /* * By definition, any InternalScanner must return false only when it has no @@ -156,6 +172,11 @@ return next(result, -1); } + @Override + public boolean next(List result, String metric) throws IOException { + return next(result, -1, metric); + } + private static class KVScannerComparator implements Comparator { private KVComparator kvComparator; /** Index: src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java (revision 1377139) +++ src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java (working copy) @@ -47,6 +47,15 @@ * @throws IOException e */ public boolean next(List results) throws IOException; + + /** + * Grab the next row's worth of values. + * @param results return output array + * @param metric the metric name + * @return true if more rows exist after this one, false if scanner is done + * @throws IOException e + */ + public boolean next(List results, String metric) throws IOException; /** * Grab the next row's worth of values with a limit on the number of values @@ -57,6 +66,17 @@ * @throws IOException e */ public boolean next(List result, int limit) throws IOException; + + /** + * Grab the next row's worth of values with a limit on the number of values + * to return. + * @param result return output array + * @param limit limit on row count to get + * @param metric the metric name + * @return true if more rows exist after this one, false if scanner is done + * @throws IOException e + */ + public boolean next(List result, int limit, String metric) throws IOException; /** * Closes the scanner and releases any resources it has allocated Index: src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (revision 1377139) +++ src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (working copy) @@ -50,8 +50,8 @@ private KeyValueHeap heap; private boolean cacheBlocks; - private String metricNameGetSize; + private String metricNamePrefix; // Used to indicate that the scanner has closed (see HBASE-1107) // Doesnt need to be volatile because it's always accessed via synchronized methods private boolean closing = false; @@ -198,7 +198,7 @@ /** * Method used internally to initialize metric names throughout the * constructors. - * + * * To be called after the store variable has been initialized! */ private void initializeMetricNames() { @@ -208,8 +208,8 @@ tableName = store.getTableName(); family = Bytes.toString(store.getFamily().getName()); } - metricNameGetSize = SchemaMetrics.generateSchemaMetricsPrefix( - tableName, family) + "getsize"; + this.metricNamePrefix = + SchemaMetrics.generateSchemaMetricsPrefix(tableName, family); } /** @@ -308,7 +308,19 @@ */ @Override public synchronized boolean next(List outResult, int limit) throws IOException { + return next(outResult, limit, null); + } + /** + * Get the next row of values from this Store. + * @param outResult + * @param limit + * @return true if there are more rows, false if scanner is done + */ + @Override + public synchronized boolean next(List outResult, int limit, + String metric) throws IOException { + if (checkReseek()) { return true; } @@ -420,7 +432,10 @@ } } } finally { - RegionMetricsStorage.incrNumericMetric(metricNameGetSize, cumulativeMetric); + if (cumulativeMetric > 0 && metric != null) { + RegionMetricsStorage.incrNumericMetric(this.metricNamePrefix + metric, + cumulativeMetric); + } } if (!results.isEmpty()) { @@ -436,9 +451,15 @@ @Override public synchronized boolean next(List outResult) throws IOException { - return next(outResult, -1); + return next(outResult, -1, null); } + @Override + public synchronized boolean next(List outResult, String metric) + throws IOException { + return next(outResult, -1, metric); + } + // Implementation of ChangedReadersObserver @Override public synchronized void updateReaders() throws IOException { Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1377139) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -44,7 +44,6 @@ import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -108,7 +107,6 @@ import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.metrics.OperationMetrics; -import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; @@ -199,7 +197,7 @@ // Registered region protocol handlers private ClassToInstanceMap protocolHandlers = MutableClassToInstanceMap.create(); - + private Map> protocolHandlerNames = Maps.newHashMap(); @@ -333,6 +331,8 @@ private RegionSplitPolicy splitPolicy; private final OperationMetrics opMetrics; + public static final String METRIC_GETSIZE = "getsize"; + public static final String METRIC_NEXTSIZE = "nextsize"; /** * Should only be used for testing purposes @@ -914,7 +914,7 @@ CompletionService> completionService = new ExecutorCompletionService>( storeCloserThreadPool); - + // close each store in parallel for (final Store store : stores.values()) { completionService @@ -2943,7 +2943,7 @@ return currentEditSeqId; } finally { status.cleanup(); - if (reader != null) { + if (reader != null) { reader.close(); } } @@ -3389,6 +3389,12 @@ @Override public synchronized boolean next(List outResults, int limit) throws IOException { + return next(outResults, limit, null); + } + + @Override + public synchronized boolean next(List outResults, int limit, + String metric) throws IOException { if (this.filterClosed) { throw new UnknownScannerException("Scanner was closed (timed out?) " + "after we renewed it. Could be caused by a very slow scanner " + @@ -3403,7 +3409,7 @@ results.clear(); - boolean returnResult = nextInternal(limit); + boolean returnResult = nextInternal(limit, metric); outResults.addAll(results); resetFilters(); @@ -3420,9 +3426,16 @@ public synchronized boolean next(List outResults) throws IOException { // apply the batching limit by default - return next(outResults, batch); + return next(outResults, batch, null); } + @Override + public synchronized boolean next(List outResults, String metric) + throws IOException { + // apply the batching limit by default + return next(outResults, batch, metric); + } + /* * @return True if a filter rules the scanner is over, done. */ @@ -3430,7 +3443,7 @@ return this.filter != null && this.filter.filterAllRemaining(); } - private boolean nextInternal(int limit) throws IOException { + private boolean nextInternal(int limit, String metric) throws IOException { RpcCallContext rpcCall = HBaseServer.getCurrentCall(); while (true) { if (rpcCall != null) { @@ -3457,7 +3470,7 @@ } else { byte [] nextRow; do { - this.storeHeap.next(results, limit - results.size()); + this.storeHeap.next(results, limit - results.size(), metric); if (limit > 0 && results.size() == limit) { if (this.filter != null && filter.hasFilterRow()) { throw new IncompatibleFilterException( @@ -4169,7 +4182,7 @@ RegionScanner scanner = null; try { scanner = getScanner(scan); - scanner.next(results); + scanner.next(results, HRegion.METRIC_GETSIZE); } finally { if (scanner != null) scanner.close(); Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1377139) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -2390,7 +2390,7 @@ && currentScanResultSize < maxScannerResultSize; i++) { requestCount.incrementAndGet(); // Collect values to be returned here - boolean moreRows = s.next(values); + boolean moreRows = s.next(values, HRegion.METRIC_NEXTSIZE); if (!values.isEmpty()) { for (KeyValue kv : values) { currentScanResultSize += kv.heapSize();