diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java index 986a026..b253f82 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java @@ -772,6 +772,7 @@ public final class CellUtil { @Override public long heapSize() { long sum = HEAP_SIZE_OVERHEAD + CellUtil.estimatedHeapSizeOf(cell); + // this.tags is on heap byte[] if (this.tags != null) { sum += ClassSize.sizeOf(this.tags, this.tags.length); } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java index 1bc9549..f5b5a09 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java @@ -244,7 +244,7 @@ public class OffheapKeyValue extends ByteBufferCell implements ExtendedCell { @Override public long heapSize() { - return ClassSize.align(FIXED_OVERHEAD + ClassSize.align(length)); + return ClassSize.align(FIXED_OVERHEAD); } @Override diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java index 067a077..cef51d8 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java @@ -667,7 +667,7 @@ abstract class BufferedDataBlockEncoder extends AbstractDataBlockEncoder { @Override public long heapSize() { - return FIXED_OVERHEAD + rowLength + familyLength + qualifierLength + valueLength + tagsLength; + return FIXED_OVERHEAD; } @Override diff --git a/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/PrefixTreeSeeker.java b/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/PrefixTreeSeeker.java index da56e8c..addb9f1 100644 --- a/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/PrefixTreeSeeker.java +++ b/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/PrefixTreeSeeker.java @@ -569,7 +569,7 @@ public class PrefixTreeSeeker implements EncodedSeeker { @Override public long heapSize() { - return FIXED_OVERHEAD + rowLength + famLength + qualLength + valLength + tagsLength; + return FIXED_OVERHEAD; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index c3db588..b82b1b4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -6060,6 +6060,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // progress. int initialBatchProgress = scannerContext.getBatchProgress(); long initialSizeProgress = scannerContext.getSizeProgress(); + long initialHeapSizeProgress = scannerContext.getHeapSizeProgress(); long initialTimeProgress = scannerContext.getTimeProgress(); // The loop here is used only when at some point during the next we determine @@ -6073,7 +6074,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (scannerContext.getKeepProgress()) { // Progress should be kept. Reset to initial values seen at start of method invocation. scannerContext.setProgress(initialBatchProgress, initialSizeProgress, - initialTimeProgress); + initialHeapSizeProgress, initialTimeProgress); } else { scannerContext.clearProgress(); } @@ -6175,14 +6176,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi long timeProgress = scannerContext.getTimeProgress(); if (scannerContext.getKeepProgress()) { scannerContext.setProgress(initialBatchProgress, initialSizeProgress, - initialTimeProgress); + initialHeapSizeProgress, initialTimeProgress); } else { scannerContext.clearProgress(); } scannerContext.setTimeProgress(timeProgress); scannerContext.incrementBatchProgress(results.size()); for (Cell cell : results) { - scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOf(cell)); + scannerContext.incrementSizeProgress(CellUtil.estimatedSerializedSizeOf(cell), + CellUtil.estimatedHeapSizeOf(cell)); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java index 3e0d7e8..46bd68e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java @@ -63,7 +63,7 @@ public class NoLimitScannerContext extends ScannerContext { } @Override - void setSizeProgress(long sizeProgress) { + void setSizeProgress(long sizeProgress, long heapSizeProgress) { // Do nothing. NoLimitScannerContext instances are immutable post-construction } @@ -78,7 +78,7 @@ public class NoLimitScannerContext extends ScannerContext { } @Override - void setProgress(int batchProgress, long sizeProgress, long timeProgress) { + void setProgress(int batchProgress, long sizeProgress, long heapSizeProgress, long timeProgress) { // Do nothing. NoLimitScannerContext instances are immutable post-construction } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 6b2056a..984b965 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -1181,7 +1181,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, Object addSize(RpcCallContext context, Result r, Object lastBlock) { if (context != null && r != null && !r.isEmpty()) { for (Cell c : r.rawCells()) { - context.incrementResponseCellSize(CellUtil.estimatedHeapSizeOf(c)); + context.incrementResponseCellSize(CellUtil.estimatedSerializedSizeOf(c)); // Since byte buffers can point all kinds of crazy places it's harder to keep track // of which blocks are kept alive by what byte buffer. @@ -2835,7 +2835,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // Configure with limits for this RPC. Set keep progress true since size progress // towards size limit should be kept between calls to nextRaw ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true); - contextBuilder.setSizeLimit(sizeScope, maxResultSize); + // maxResultSize - either we can reach this much size for all cells(being read) data or sum + // of heap size occupied by cells(being read). Cell data means its key and value parts. + contextBuilder.setSizeLimit(sizeScope, maxResultSize, maxResultSize); contextBuilder.setBatchLimit(scanner.getBatch()); contextBuilder.setTimeLimit(timeScope, timeLimit); contextBuilder.setTrackMetrics(trackMetrics); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java index dc21628..0272861 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java @@ -104,7 +104,7 @@ public class ScannerContext { if (limitsToCopy != null) this.limits.copy(limitsToCopy); // Progress fields are initialized to 0 - progress = new LimitFields(0, LimitFields.DEFAULT_SCOPE, 0, LimitFields.DEFAULT_SCOPE, 0); + progress = new LimitFields(0, LimitFields.DEFAULT_SCOPE, 0, 0, LimitFields.DEFAULT_SCOPE, 0); this.keepProgress = keepProgress; this.scannerState = DEFAULT_STATE; @@ -150,9 +150,11 @@ public class ScannerContext { /** * Progress towards the size limit has been made. Increment internal tracking of size progress */ - void incrementSizeProgress(long size) { + void incrementSizeProgress(long size, long heapSize) { long currentSize = progress.getSize(); progress.setSize(currentSize + size); + long curHeapSize = progress.getHeapSize(); + progress.setHeapSize(curHeapSize + heapSize); } /** @@ -170,18 +172,23 @@ public class ScannerContext { return progress.getSize(); } + long getHeapSizeProgress() { + return progress.getHeapSize(); + } + long getTimeProgress() { return progress.getTime(); } - void setProgress(int batchProgress, long sizeProgress, long timeProgress) { + void setProgress(int batchProgress, long sizeProgress, long heapSizeProgress, long timeProgress) { setBatchProgress(batchProgress); - setSizeProgress(sizeProgress); + setSizeProgress(sizeProgress, heapSizeProgress); setTimeProgress(timeProgress); } - void setSizeProgress(long sizeProgress) { + void setSizeProgress(long sizeProgress, long heapSizeProgress) { progress.setSize(sizeProgress); + progress.setHeapSize(heapSizeProgress); } void setBatchProgress(int batchProgress) { @@ -197,7 +204,7 @@ public class ScannerContext { * values */ void clearProgress() { - progress.setFields(0, LimitFields.DEFAULT_SCOPE, 0, LimitFields.DEFAULT_SCOPE, 0); + progress.setFields(0, LimitFields.DEFAULT_SCOPE, 0, 0, LimitFields.DEFAULT_SCOPE, 0); } /** @@ -240,7 +247,8 @@ public class ScannerContext { * @return true if the size limit can be enforced in the checker's scope */ boolean hasSizeLimit(LimitScope checkerScope) { - return limits.canEnforceSizeLimitFromScope(checkerScope) && limits.getSize() > 0; + return limits.canEnforceSizeLimitFromScope(checkerScope) + && (limits.getSize() > 0 || limits.getHeapSize() > 0); } /** @@ -298,7 +306,8 @@ public class ScannerContext { * @return true when the limit is enforceable from the checker's scope and it has been reached */ boolean checkSizeLimit(LimitScope checkerScope) { - return hasSizeLimit(checkerScope) && progress.getSize() >= limits.getSize(); + return hasSizeLimit(checkerScope) && (progress.getSize() >= limits.getSize() + || progress.getHeapSize() >= limits.getHeapSize()); } /** @@ -370,9 +379,10 @@ public class ScannerContext { return this; } - public Builder setSizeLimit(LimitScope sizeScope, long sizeLimit) { + public Builder setSizeLimit(LimitScope sizeScope, long sizeLimit, long heapSizeLimit) { limits.setSize(sizeLimit); limits.setSizeScope(sizeScope); + limits.setHeapSize(heapSizeLimit); return this; } @@ -515,7 +525,8 @@ public class ScannerContext { int batch = DEFAULT_BATCH; LimitScope sizeScope = DEFAULT_SCOPE; - long size = DEFAULT_SIZE; + long size = DEFAULT_SIZE; // The sum of cell sizes(key + value) + long heapSize = DEFAULT_SIZE; LimitScope timeScope = DEFAULT_SCOPE; long time = DEFAULT_TIME; @@ -526,14 +537,15 @@ public class ScannerContext { LimitFields() { } - LimitFields(int batch, LimitScope sizeScope, long size, LimitScope timeScope, long time) { - setFields(batch, sizeScope, size, timeScope, time); + LimitFields(int batch, LimitScope sizeScope, long size, long heapSize, LimitScope timeScope, + long time) { + setFields(batch, sizeScope, size, heapSize, timeScope, time); } void copy(LimitFields limitsToCopy) { if (limitsToCopy != null) { setFields(limitsToCopy.getBatch(), limitsToCopy.getSizeScope(), limitsToCopy.getSize(), - limitsToCopy.getTimeScope(), limitsToCopy.getTime()); + limitsToCopy.getHeapSize(), limitsToCopy.getTimeScope(), limitsToCopy.getTime()); } } @@ -543,10 +555,12 @@ public class ScannerContext { * @param sizeScope * @param size */ - void setFields(int batch, LimitScope sizeScope, long size, LimitScope timeScope, long time) { + void setFields(int batch, LimitScope sizeScope, long size, long heapSize, LimitScope timeScope, + long time) { setBatch(batch); setSizeScope(sizeScope); setSize(size); + setHeapSize(heapSize); setTimeScope(timeScope); setTime(time); } @@ -571,10 +585,18 @@ public class ScannerContext { return this.size; } + long getHeapSize() { + return this.heapSize; + } + void setSize(long size) { this.size = size; } + void setHeapSize(long heapSize) { + this.heapSize = heapSize; + } + /** * @return {@link LimitScope} indicating scope in which the size limit is enforced */ @@ -638,6 +660,9 @@ public class ScannerContext { sb.append(", size:"); sb.append(size); + sb.append(", heapsize:"); + sb.append(heapSize); + sb.append(", sizeScope:"); sb.append(sizeScope); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 8c48aef..5c21a41 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -603,10 +603,11 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner // Update local tracking information count++; - totalBytesRead += CellUtil.estimatedSerializedSizeOf(cell); + int cellSize = CellUtil.estimatedSerializedSizeOf(cell); + totalBytesRead += cellSize; // Update the progress of the scanner context - scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOf(cell)); + scannerContext.incrementSizeProgress(cellSize, CellUtil.estimatedHeapSizeOf(cell)); scannerContext.incrementBatchProgress(1); if (matcher.isUserScan() && totalBytesRead > maxRowSize) {