From b28dd9d9373f4b9a7e1a2331e73b5a8e90c46c2a Mon Sep 17 00:00:00 2001 From: Elliott Clark Date: Mon, 8 Apr 2013 17:04:51 -0700 Subject: [PATCH] Starting to add metrics for scan and get size. --- .../regionserver/MetricsRegionServerSource.java | 1 + .../hbase/regionserver/MetricsRegionSource.java | 15 +- .../regionserver/MetricsRegionSourceImpl.java | 28 ++- .../regionserver/MetricsRegionSourceImpl.java | 32 +++- .../apache/hadoop/hbase/regionserver/HRegion.java | 66 +++---- .../hadoop/hbase/regionserver/InternalScanner.java | 20 --- .../hadoop/hbase/regionserver/KeyValueHeap.java | 23 +-- .../hadoop/hbase/regionserver/MetricsRegion.java | 8 +- .../hadoop/hbase/regionserver/RegionScanner.java | 5 +- .../hadoop/hbase/regionserver/StoreScanner.java | 183 ++++++++------------ .../coprocessor/TestCoprocessorInterface.java | 20 +-- .../coprocessor/TestRegionObserverInterface.java | 14 +- .../regionserver/TestRegionServerMetrics.java | 44 ++++- 13 files changed, 226 insertions(+), 233 deletions(-) diff --git hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java index 1436b13..4a14849 100644 --- hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java +++ hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java @@ -188,6 +188,7 @@ public interface MetricsRegionServerSource extends BaseSource { static final String INCREMENT_KEY = "increment"; static final String MUTATE_KEY = "mutate"; static final String APPEND_KEY = "append"; + static final String SCAN_NEXT_KEY = "scanNext"; static final String SLOW_MUTATE_KEY = "slowPutCount"; static final String SLOW_GET_KEY = "slowGetCount"; static final String SLOW_DELETE_KEY = "slowDeleteCount"; diff --git hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSource.java hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSource.java index 0bc14c3..901473d 100644 --- hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSource.java +++ hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSource.java @@ -25,6 +25,9 @@ package org.apache.hadoop.hbase.regionserver; */ public interface MetricsRegionSource extends Comparable { + public static final String OPS_SAMPLE_NAME = "ops"; + public static final String SIZE_VALUE_NAME = "size"; + /** * Close the region's metrics as this region is closing. */ @@ -41,11 +44,17 @@ public interface MetricsRegionSource extends Comparable { void updateDelete(); /** - * Update related counts of gets. + * Update count and sizes of gets. + * @param getSize size in bytes of the resulting key values for a get */ - void updateGet(); + void updateGet(long getSize); /** + * Update the count and sizes of resultScanner.next() + * @param scanSize Size in bytes of the resulting key values for a next() + */ + void updateScan(long scanSize); + /** * Update related counts of increments. */ void updateIncrement(); @@ -59,4 +68,6 @@ public interface MetricsRegionSource extends Comparable { * Get the aggregate source to which this reports. */ MetricsRegionAggregateSource getAggregateSource(); + + } diff --git hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSourceImpl.java hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSourceImpl.java index 329b667..385c645 100644 --- hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSourceImpl.java +++ hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSourceImpl.java @@ -24,6 +24,7 @@ import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.metrics2.impl.JmxCacheBuster; import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry; import org.apache.hadoop.metrics2.lib.MetricMutableCounterLong; +import org.apache.hadoop.metrics2.lib.MetricMutableStat; public class MetricsRegionSourceImpl implements MetricsRegionSource { @@ -39,12 +40,15 @@ public class MetricsRegionSourceImpl implements MetricsRegionSource { private String regionGetKey; private String regionIncrementKey; private String regionAppendKey; + private String regionScanNextKey; private MetricMutableCounterLong regionPut; private MetricMutableCounterLong regionDelete; - private MetricMutableCounterLong regionGet; private MetricMutableCounterLong regionIncrement; private MetricMutableCounterLong regionAppend; + private MetricMutableStat regionGet; + private MetricMutableStat regionScanNext; + public MetricsRegionSourceImpl(MetricsRegionWrapper regionWrapper, MetricsRegionAggregateSourceImpl aggregate) { this.regionWrapper = regionWrapper; @@ -70,14 +74,17 @@ public class MetricsRegionSourceImpl implements MetricsRegionSource { regionDeleteKey = regionNamePrefix + MetricsRegionServerSource.DELETE_KEY + suffix; regionDelete = registry.getLongCounter(regionDeleteKey, 0l); - regionGetKey = regionNamePrefix + MetricsRegionServerSource.GET_KEY + suffix; - regionGet = registry.getLongCounter(regionGetKey, 0l); - regionIncrementKey = regionNamePrefix + MetricsRegionServerSource.INCREMENT_KEY + suffix; regionIncrement = registry.getLongCounter(regionIncrementKey, 0l); regionAppendKey = regionNamePrefix + MetricsRegionServerSource.APPEND_KEY + suffix; regionAppend = registry.getLongCounter(regionAppendKey, 0l); + + regionGetKey = regionNamePrefix + MetricsRegionServerSource.GET_KEY; + regionGet = registry.newStat(regionGetKey, "", OPS_SAMPLE_NAME, SIZE_VALUE_NAME); + + regionScanNextKey = regionNamePrefix + MetricsRegionServerSource.SCAN_NEXT_KEY; + regionScanNext = registry.newStat(regionScanNextKey, "", OPS_SAMPLE_NAME, SIZE_VALUE_NAME); } @Override @@ -88,11 +95,13 @@ public class MetricsRegionSourceImpl implements MetricsRegionSource { LOG.trace("Removing region Metrics: " + regionWrapper.getRegionName()); registry.removeMetric(regionPutKey); registry.removeMetric(regionDeleteKey); - registry.removeMetric(regionGetKey); registry.removeMetric(regionIncrementKey); registry.removeMetric(regionAppendKey); + registry.removeMetric(regionGetKey); + registry.removeMetric(regionScanNextKey); + JmxCacheBuster.clearJmxCache(); } @@ -107,8 +116,13 @@ public class MetricsRegionSourceImpl implements MetricsRegionSource { } @Override - public void updateGet() { - regionGet.incr(); + public void updateGet(long getSize) { + regionGet.add(getSize); + } + + @Override + public void updateScan(long scanSize) { + regionScanNext.add(scanSize); } @Override diff --git hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSourceImpl.java hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSourceImpl.java index 9f2d749..b6a270a 100644 --- hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSourceImpl.java +++ hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSourceImpl.java @@ -20,16 +20,18 @@ package org.apache.hadoop.hbase.regionserver; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.regionserver.MetricsRegionSourceImpl; import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.metrics2.impl.JmxCacheBuster; import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry; import org.apache.hadoop.metrics2.lib.Interns; import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.lib.MutableStat; public class MetricsRegionSourceImpl implements MetricsRegionSource { private final MetricsRegionWrapper regionWrapper; + + private boolean closed = false; private MetricsRegionAggregateSourceImpl agg; private DynamicMetricsRegistry registry; @@ -41,12 +43,15 @@ public class MetricsRegionSourceImpl implements MetricsRegionSource { private String regionGetKey; private String regionIncrementKey; private String regionAppendKey; + private String regionScanNextKey; private MutableCounterLong regionPut; private MutableCounterLong regionDelete; - private MutableCounterLong regionGet; + private MutableCounterLong regionIncrement; private MutableCounterLong regionAppend; + private MutableStat regionGet; + private MutableStat regionScanNext; public MetricsRegionSourceImpl(MetricsRegionWrapper regionWrapper, MetricsRegionAggregateSourceImpl aggregate) { @@ -72,14 +77,17 @@ public class MetricsRegionSourceImpl implements MetricsRegionSource { regionDeleteKey = regionNamePrefix + MetricsRegionServerSource.DELETE_KEY + suffix; regionDelete = registry.getLongCounter(regionDeleteKey, 0l); - regionGetKey = regionNamePrefix + MetricsRegionServerSource.GET_KEY + suffix; - regionGet = registry.getLongCounter(regionGetKey, 0l); - regionIncrementKey = regionNamePrefix + MetricsRegionServerSource.INCREMENT_KEY + suffix; regionIncrement = registry.getLongCounter(regionIncrementKey, 0l); regionAppendKey = regionNamePrefix + MetricsRegionServerSource.APPEND_KEY + suffix; regionAppend = registry.getLongCounter(regionAppendKey, 0l); + + regionGetKey = regionNamePrefix + MetricsRegionServerSource.GET_KEY; + regionGet = registry.newStat(regionGetKey, "", OPS_SAMPLE_NAME, SIZE_VALUE_NAME); + + regionScanNextKey = regionNamePrefix + MetricsRegionServerSource.SCAN_NEXT_KEY; + regionScanNext = registry.newStat(regionScanNextKey, "", OPS_SAMPLE_NAME, SIZE_VALUE_NAME); } @Override @@ -90,11 +98,14 @@ public class MetricsRegionSourceImpl implements MetricsRegionSource { LOG.trace("Removing region Metrics: " + regionWrapper.getRegionName()); registry.removeMetric(regionPutKey); registry.removeMetric(regionDeleteKey); - registry.removeMetric(regionGetKey); + registry.removeMetric(regionIncrementKey); registry.removeMetric(regionAppendKey); + registry.removeMetric(regionGetKey); + registry.removeMetric(regionScanNextKey); + JmxCacheBuster.clearJmxCache(); } @@ -109,8 +120,13 @@ public class MetricsRegionSourceImpl implements MetricsRegionSource { } @Override - public void updateGet() { - regionGet.incr(); + public void updateGet(long getSize) { + regionGet.add(getSize); + } + + @Override + public void updateScan(long scanSize) { + regionScanNext.add(scanSize); } @Override diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index c887e96..b4ccb26 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -3421,14 +3421,14 @@ public class HRegion implements HeapSize { // , Writable{ } @Override - public boolean next(List outResults, int limit) + public boolean next(List outResults) throws IOException { - return next(outResults, limit, null); + // apply the batching limit by default + return next(outResults, batch); } @Override - public synchronized boolean next(List outResults, int limit, - String metric) throws IOException { + public synchronized boolean next(List outResults, int limit) throws IOException { if (this.filterClosed) { throw new UnknownScannerException("Scanner was closed (timed out?) " + "after we renewed it. Could be caused by a very slow scanner " + @@ -3441,7 +3441,7 @@ public class HRegion implements HeapSize { // , Writable{ // This could be a new thread from the last time we called next(). MultiVersionConsistencyControl.setThreadReadPoint(this.readPt); - return nextRaw(outResults, limit, metric); + return nextRaw(outResults, limit); } finally { closeRegionOperation(); } @@ -3450,49 +3450,44 @@ public class HRegion implements HeapSize { // , Writable{ @Override public boolean nextRaw(List outResults) throws IOException { - return nextRaw(outResults, batch, null); + return nextRaw(outResults, batch); } @Override - public boolean nextRaw(List outResults, int limit, - String metric) throws IOException { + public boolean nextRaw(List outResults, int limit) throws IOException { boolean returnResult; if (outResults.isEmpty()) { // Usually outResults is empty. This is true when next is called // to handle scan or get operation. - returnResult = nextInternal(outResults, limit, metric); + returnResult = nextInternal(outResults, limit); } else { List tmpList = new ArrayList(); - returnResult = nextInternal(tmpList, limit, metric); + returnResult = nextInternal(tmpList, limit); outResults.addAll(tmpList); } resetFilters(); if (isFilterDone()) { return false; } + if (region != null && region.metricsRegion != null) { + long totalSize = 0; + if (outResults != null) { + for(KeyValue kv:outResults) { + totalSize += kv.getLength(); + } + } + region.metricsRegion.updateScanNext(totalSize); + } return returnResult; } - @Override - public boolean next(List outResults) - throws IOException { - // apply the batching limit by default - return next(outResults, batch, null); - } - - @Override - public boolean next(List outResults, String metric) - throws IOException { - // apply the batching limit by default - return next(outResults, batch, metric); - } - private void populateFromJoinedHeap(List results, int limit, String metric) + private void populateFromJoinedHeap(List results, int limit) throws IOException { assert joinedContinuationRow != null; KeyValue kv = populateResult(results, this.joinedHeap, limit, joinedContinuationRow.getBuffer(), joinedContinuationRow.getRowOffset(), - joinedContinuationRow.getRowLength(), metric); + joinedContinuationRow.getRowLength()); if (kv != KV_LIMIT) { // We are done with this row, reset the continuation. joinedContinuationRow = null; @@ -3510,14 +3505,13 @@ public class HRegion implements HeapSize { // , Writable{ * @param currentRow Byte array with key we are fetching. * @param offset offset for currentRow * @param length length for currentRow - * @param metric Metric key to be passed into KeyValueHeap::next(). * @return KV_LIMIT if limit reached, next KeyValue otherwise. */ private KeyValue populateResult(List results, KeyValueHeap heap, int limit, - byte[] currentRow, int offset, short length, String metric) throws IOException { + byte[] currentRow, int offset, short length) throws IOException { KeyValue nextKv; do { - heap.next(results, limit - results.size(), metric); + heap.next(results, limit - results.size()); if (limit > 0 && results.size() == limit) { return KV_LIMIT; } @@ -3534,7 +3528,7 @@ public class HRegion implements HeapSize { // , Writable{ return this.filter != null && this.filter.filterAllRemaining(); } - private boolean nextInternal(List results, int limit, String metric) + private boolean nextInternal(List results, int limit) throws IOException { if (!results.isEmpty()) { throw new IllegalArgumentException("First parameter should be an empty list"); @@ -3587,7 +3581,7 @@ public class HRegion implements HeapSize { // , Writable{ } KeyValue nextKv = populateResult(results, this.storeHeap, limit, currentRow, offset, - length, metric); + length); // Ok, we are good, let's try to get some results from the main heap. if (nextKv == KV_LIMIT) { if (this.filter != null && filter.hasFilterRow()) { @@ -3632,12 +3626,12 @@ public class HRegion implements HeapSize { // , Writable{ && joinedHeap.peek().matchingRow(currentRow, offset, length)); if (mayHaveData) { joinedContinuationRow = current; - populateFromJoinedHeap(results, limit, metric); + populateFromJoinedHeap(results, limit); } } } else { // Populating from the joined heap was stopped by limits, populate some more. - populateFromJoinedHeap(results, limit, metric); + populateFromJoinedHeap(results, limit); } // We may have just called populateFromJoinedMap and hit the limits. If that is @@ -4347,7 +4341,13 @@ public class HRegion implements HeapSize { // , Writable{ // do after lock if (this.metricsRegion != null) { - this.metricsRegion.updateGet(); + long totalSize = 0l; + if (results != null) { + for (KeyValue kv:results) { + totalSize += kv.getLength(); + } + } + this.metricsRegion.updateGet(totalSize); } return results; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java index 728f1ad..e4be0a7 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java @@ -48,15 +48,6 @@ public interface InternalScanner extends Closeable { * @throws IOException e */ public boolean next(List results) throws IOException; - - /** - * Grab the next row's worth of values. - * @param results return output array - * @param metric the metric name - * @return true if more rows exist after this one, false if scanner is done - * @throws IOException e - */ - public boolean next(List results, String metric) throws IOException; /** * Grab the next row's worth of values with a limit on the number of values @@ -67,17 +58,6 @@ public interface InternalScanner extends Closeable { * @throws IOException e */ public boolean next(List result, int limit) throws IOException; - - /** - * Grab the next row's worth of values with a limit on the number of values - * to return. - * @param result return output array - * @param limit limit on row count to get - * @param metric the metric name - * @return true if more rows exist after this one, false if scanner is done - * @throws IOException e - */ - public boolean next(List result, int limit, String metric) throws IOException; /** * Closes the scanner and releases any resources it has allocated diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java index ed501dc..9d2aceb 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java @@ -121,27 +121,11 @@ public class KeyValueHeap extends NonLazyKeyValueScanner * @return true if there are more keys, false if all scanners are done */ public boolean next(List result, int limit) throws IOException { - return next(result, limit, null); - } - - /** - * Gets the next row of keys from the top-most scanner. - *

- * This method takes care of updating the heap. - *

- * This can ONLY be called when you are using Scanners that implement - * InternalScanner as well as KeyValueScanner (a {@link StoreScanner}). - * @param result output result list - * @param limit limit on row count to get - * @param metric the metric name - * @return true if there are more keys, false if all scanners are done - */ - public boolean next(List result, int limit, String metric) throws IOException { if (this.current == null) { return false; } InternalScanner currentAsInternal = (InternalScanner)this.current; - boolean mayContainMoreRows = currentAsInternal.next(result, limit, metric); + boolean mayContainMoreRows = currentAsInternal.next(result, limit); KeyValue pee = this.current.peek(); /* * By definition, any InternalScanner must return false only when it has no @@ -173,11 +157,6 @@ public class KeyValueHeap extends NonLazyKeyValueScanner return next(result, -1); } - @Override - public boolean next(List result, String metric) throws IOException { - return next(result, -1, metric); - } - private static class KVScannerComparator implements Comparator { private KVComparator kvComparator; /** diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegion.java index 7079530..fd62f03 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegion.java @@ -48,8 +48,12 @@ public class MetricsRegion { source.updateDelete(); } - public void updateGet() { - source.updateGet(); + public void updateGet(long getSize) { + source.updateGet(getSize); + } + + public void updateScanNext(long scanSize) { + source.updateScan(scanSize); } public void updateAppend() { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java index f71625a..e470476 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java @@ -69,7 +69,7 @@ public interface RegionScanner extends InternalScanner { * to return. * This is a special internal method to be called from coprocessor hooks to avoid expensive setup. * Caller must set the thread's readpoint, start and close a region operation, an synchronize on the scanner object. - * See {@link #nextRaw(List, int, String)} + * See {@link #nextRaw(List, int)} * @param result return output array * @return true if more rows exist after this one, false if scanner is done * @throws IOException e @@ -99,9 +99,8 @@ public interface RegionScanner extends InternalScanner { * * @param result return output array * @param limit limit on row count to get - * @param metric the metric name * @return true if more rows exist after this one, false if scanner is done * @throws IOException e */ - public boolean nextRaw(List result, int limit, String metric) throws IOException; + public boolean nextRaw(List result, int limit) throws IOException; } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 6d0dc29..fe77b11 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -353,19 +353,6 @@ public class StoreScanner extends NonLazyKeyValueScanner */ @Override public synchronized boolean next(List outResult, int limit) throws IOException { - return next(outResult, limit, null); - } - - /** - * Get the next row of values from this Store. - * @param outResult - * @param limit - * @return true if there are more rows, false if scanner is done - */ - @Override - public synchronized boolean next(List outResult, int limit, - String metric) throws IOException { - if (checkReseek()) { return true; } @@ -401,104 +388,94 @@ public class StoreScanner extends NonLazyKeyValueScanner KeyValue.KVComparator comparator = store != null ? store.getComparator() : null; - long cumulativeMetric = 0; int count = 0; - try { - LOOP: while((kv = this.heap.peek()) != null) { - // Check that the heap gives us KVs in an increasing order. - assert prevKV == null || comparator == null || comparator.compare(prevKV, kv) <= 0 : - "Key " + prevKV + " followed by a " + "smaller key " + kv + " in cf " + store; - prevKV = kv; - - ScanQueryMatcher.MatchCode qcode = matcher.match(kv); - switch(qcode) { - case INCLUDE: - case INCLUDE_AND_SEEK_NEXT_ROW: - case INCLUDE_AND_SEEK_NEXT_COL: - - Filter f = matcher.getFilter(); - if (f != null) { - kv = f.transform(kv); - } - - this.countPerRow++; - if (storeLimit > -1 && - this.countPerRow > (storeLimit + storeOffset)) { - // do what SEEK_NEXT_ROW does. - if (!matcher.moreRowsMayExistAfter(kv)) { - return false; - } - reseek(matcher.getKeyForNextRow(kv)); - break LOOP; - } - - // add to results only if we have skipped #storeOffset kvs - // also update metric accordingly - if (this.countPerRow > storeOffset) { - if (metric != null) { - cumulativeMetric += kv.getLength(); - } - outResult.add(kv); - count++; - } - - if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) { - if (!matcher.moreRowsMayExistAfter(kv)) { - return false; - } - reseek(matcher.getKeyForNextRow(kv)); - } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) { - reseek(matcher.getKeyForNextColumn(kv)); - } else { - this.heap.next(); - } - - if (limit > 0 && (count == limit)) { - break LOOP; + LOOP: while((kv = this.heap.peek()) != null) { + // Check that the heap gives us KVs in an increasing order. + assert prevKV == null || comparator == null || comparator.compare(prevKV, kv) <= 0 : + "Key " + prevKV + " followed by a " + "smaller key " + kv + " in cf " + store; + prevKV = kv; + + ScanQueryMatcher.MatchCode qcode = matcher.match(kv); + switch(qcode) { + case INCLUDE: + case INCLUDE_AND_SEEK_NEXT_ROW: + case INCLUDE_AND_SEEK_NEXT_COL: + + Filter f = matcher.getFilter(); + if (f != null) { + kv = f.transform(kv); + } + + this.countPerRow++; + if (storeLimit > -1 && + this.countPerRow > (storeLimit + storeOffset)) { + // do what SEEK_NEXT_ROW does. + if (!matcher.moreRowsMayExistAfter(kv)) { + return false; } - continue; + reseek(matcher.getKeyForNextRow(kv)); + break LOOP; + } - case DONE: - return true; + // add to results only if we have skipped #storeOffset kvs + // also update metric accordingly + if (this.countPerRow > storeOffset) { + outResult.add(kv); + count++; + } - case DONE_SCAN: - close(); - return false; - - case SEEK_NEXT_ROW: - // This is just a relatively simple end of scan fix, to short-cut end - // us if there is an endKey in the scan. + if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) { if (!matcher.moreRowsMayExistAfter(kv)) { return false; } - reseek(matcher.getKeyForNextRow(kv)); - break; - - case SEEK_NEXT_COL: + } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) { reseek(matcher.getKeyForNextColumn(kv)); - break; - - case SKIP: + } else { this.heap.next(); - break; - - case SEEK_NEXT_USING_HINT: - KeyValue nextKV = matcher.getNextKeyHint(kv); - if (nextKV != null) { - reseek(nextKV); - } else { - heap.next(); - } - break; + } - default: - throw new RuntimeException("UNEXPECTED"); - } - } - } finally { - if (cumulativeMetric > 0 && metric != null) { + if (limit > 0 && (count == limit)) { + break LOOP; + } + continue; + + case DONE: + return true; + + case DONE_SCAN: + close(); + return false; + case SEEK_NEXT_ROW: + // This is just a relatively simple end of scan fix, to short-cut end + // us if there is an endKey in the scan. + if (!matcher.moreRowsMayExistAfter(kv)) { + return false; + } + + reseek(matcher.getKeyForNextRow(kv)); + break; + + case SEEK_NEXT_COL: + reseek(matcher.getKeyForNextColumn(kv)); + break; + + case SKIP: + this.heap.next(); + break; + + case SEEK_NEXT_USING_HINT: + KeyValue nextKV = matcher.getNextKeyHint(kv); + if (nextKV != null) { + reseek(nextKV); + } else { + heap.next(); + } + break; + + default: + throw new RuntimeException("UNEXPECTED"); } } @@ -513,13 +490,7 @@ public class StoreScanner extends NonLazyKeyValueScanner @Override public synchronized boolean next(List outResult) throws IOException { - return next(outResult, -1, null); - } - - @Override - public synchronized boolean next(List outResult, String metric) - throws IOException { - return next(outResult, -1, metric); + return next(outResult, -1); } // Implementation of ChangedReadersObserver diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java index 00b0fda..34c3ce8 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java @@ -78,32 +78,20 @@ public class TestCoprocessorInterface extends HBaseTestCase { } @Override - public boolean next(List results, String metric) - throws IOException { - return delegate.next(results, metric); - } - - @Override public boolean next(List result, int limit) throws IOException { return delegate.next(result, limit); } @Override - public boolean next(List result, int limit, String metric) - throws IOException { - return delegate.next(result, limit, metric); - } - - @Override - public boolean nextRaw(List result, int limit, String metric) + public boolean nextRaw(List result) throws IOException { - return delegate.nextRaw(result, limit, metric); + return delegate.nextRaw(result); } @Override - public boolean nextRaw(List result) + public boolean nextRaw(List result, int limit) throws IOException { - return delegate.nextRaw(result); + return delegate.nextRaw(result, limit); } @Override diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java index 72e9726..36262a0 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java @@ -324,24 +324,12 @@ public class TestRegionObserverInterface { } @Override - public boolean next(List results, String metric) - throws IOException { - return next(results, -1, metric); - } - - @Override public boolean next(List results, int limit) throws IOException{ - return next(results, limit, null); - } - - @Override - public boolean next(List results, int limit, String metric) - throws IOException { List internalResults = new ArrayList(); boolean hasMore; do { - hasMore = scanner.next(internalResults, limit, metric); + hasMore = scanner.next(internalResults, limit); if (!internalResults.isEmpty()) { long row = Bytes.toLong(internalResults.get(0).getRow()); if (row % 2 == 0) { diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java index a40a0d5..e6d4244 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java @@ -31,6 +31,9 @@ import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import static org.junit.Assert.*; + +import java.io.IOException; @Category(MediumTests.class) @@ -139,7 +142,7 @@ public class TestRegionServerMetrics { .getSource() .getAggregateSource(); String prefix = "table."+tableNameString + ".region." + i.getEncodedName(); - metricsHelper.assertCounter(prefix + ".getCount", 10, agg); + metricsHelper.assertCounter(prefix + ".getNumOps", 10, agg); metricsHelper.assertCounter(prefix + ".mutateCount", 30, agg); } @@ -309,4 +312,43 @@ public class TestRegionServerMetrics { t.close(); } + + @Test + public void testScanNext() throws IOException { + String tableNameString = "testScanNext"; + byte[] tableName = Bytes.toBytes(tableNameString); + byte[] cf = Bytes.toBytes("d"); + byte[] qualifier = Bytes.toBytes("qual"); + byte[] val = Bytes.toBytes("One"); + + + TEST_UTIL.createTable(tableName, cf); + HTable t = new HTable(conf, tableName); + t.setAutoFlush(false); + for (int insertCount =0; insertCount < 100; insertCount++) { + Put p = new Put(Bytes.toBytes("" + insertCount + "row")); + p.add(cf, qualifier, val); + t.put(p); + } + t.flushCommits(); + + Scan s = new Scan(); + s.setBatch(1); + s.setCaching(1); + ResultScanner resultScanners = t.getScanner(s); + + for (int nextCount = 0; nextCount < 30; nextCount++) { + Result result = resultScanners.next(); + assertNotNull(result); + assertEquals(1, result.size()); + } + for ( HRegionInfo i:t.getRegionLocations().keySet()) { + MetricsRegionAggregateSource agg = rs.getRegion(i.getRegionName()) + .getMetrics() + .getSource() + .getAggregateSource(); + String prefix = "table."+tableNameString + ".region." + i.getEncodedName(); + metricsHelper.assertCounter(prefix + ".scanNextNumOps", 30, agg); + } + } } -- 1.7.10.2 (Apple Git-33)