diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ByteBufferKeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ByteBufferKeyValue.java index 43d9227..66bc9b1 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/ByteBufferKeyValue.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ByteBufferKeyValue.java @@ -252,7 +252,10 @@ public class ByteBufferKeyValue extends ByteBufferCell implements ExtendedCell { @Override public long heapSize() { - return ClassSize.align(FIXED_OVERHEAD + ClassSize.align(length)); + if (this.buf.hasArray()) { + return ClassSize.align(FIXED_OVERHEAD + length); + } + return ClassSize.align(FIXED_OVERHEAD); } @Override diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java index 1dab10c..28c1d88 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java @@ -772,6 +772,7 @@ public final class CellUtil { @Override public long heapSize() { long sum = HEAP_SIZE_OVERHEAD + CellUtil.estimatedHeapSizeOf(cell); + // this.tags is on heap byte[] if (this.tags != null) { sum += ClassSize.sizeOf(this.tags, this.tags.length); } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java index 067a077..cef51d8 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java @@ -667,7 +667,7 @@ abstract class BufferedDataBlockEncoder extends AbstractDataBlockEncoder { @Override public long heapSize() { - return FIXED_OVERHEAD + rowLength + familyLength + qualifierLength + valueLength + tagsLength; + return FIXED_OVERHEAD; } @Override diff --git a/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/PrefixTreeSeeker.java b/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/PrefixTreeSeeker.java index da56e8c..addb9f1 100644 --- a/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/PrefixTreeSeeker.java +++ b/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/PrefixTreeSeeker.java @@ -569,7 +569,7 @@ public class PrefixTreeSeeker implements EncodedSeeker { @Override public long heapSize() { - return FIXED_OVERHEAD + rowLength + famLength + qualLength + valLength + tagsLength; + return FIXED_OVERHEAD; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index c3db588..5fa6ca1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -6059,7 +6059,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // may need to be reset a few times if rows are being filtered out so we save the initial // progress. int initialBatchProgress = scannerContext.getBatchProgress(); - long initialSizeProgress = scannerContext.getSizeProgress(); + long initialSizeProgress = scannerContext.getDataSizeProgress(); + long initialHeapSizeProgress = scannerContext.getHeapSizeProgress(); long initialTimeProgress = scannerContext.getTimeProgress(); // The loop here is used only when at some point during the next we determine @@ -6073,7 +6074,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (scannerContext.getKeepProgress()) { // Progress should be kept. Reset to initial values seen at start of method invocation. scannerContext.setProgress(initialBatchProgress, initialSizeProgress, - initialTimeProgress); + initialHeapSizeProgress, initialTimeProgress); } else { scannerContext.clearProgress(); } @@ -6175,14 +6176,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi long timeProgress = scannerContext.getTimeProgress(); if (scannerContext.getKeepProgress()) { scannerContext.setProgress(initialBatchProgress, initialSizeProgress, - initialTimeProgress); + initialHeapSizeProgress, initialTimeProgress); } else { scannerContext.clearProgress(); } scannerContext.setTimeProgress(timeProgress); scannerContext.incrementBatchProgress(results.size()); for (Cell cell : results) { - scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOf(cell)); + scannerContext.incrementSizeProgress(CellUtil.estimatedSerializedSizeOf(cell), + CellUtil.estimatedHeapSizeOf(cell)); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java index 3e0d7e8..46bd68e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java @@ -63,7 +63,7 @@ public class NoLimitScannerContext extends ScannerContext { } @Override - void setSizeProgress(long sizeProgress) { + void setSizeProgress(long sizeProgress, long heapSizeProgress) { // Do nothing. NoLimitScannerContext instances are immutable post-construction } @@ -78,7 +78,7 @@ public class NoLimitScannerContext extends ScannerContext { } @Override - void setProgress(int batchProgress, long sizeProgress, long timeProgress) { + void setProgress(int batchProgress, long sizeProgress, long heapSizeProgress, long timeProgress) { // Do nothing. NoLimitScannerContext instances are immutable post-construction } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 6b2056a..984b965 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -1181,7 +1181,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, Object addSize(RpcCallContext context, Result r, Object lastBlock) { if (context != null && r != null && !r.isEmpty()) { for (Cell c : r.rawCells()) { - context.incrementResponseCellSize(CellUtil.estimatedHeapSizeOf(c)); + context.incrementResponseCellSize(CellUtil.estimatedSerializedSizeOf(c)); // Since byte buffers can point all kinds of crazy places it's harder to keep track // of which blocks are kept alive by what byte buffer. @@ -2835,7 +2835,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // Configure with limits for this RPC. Set keep progress true since size progress // towards size limit should be kept between calls to nextRaw ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true); - contextBuilder.setSizeLimit(sizeScope, maxResultSize); + // maxResultSize - either we can reach this much size for all cells(being read) data or sum + // of heap size occupied by cells(being read). Cell data means its key and value parts. + contextBuilder.setSizeLimit(sizeScope, maxResultSize, maxResultSize); contextBuilder.setBatchLimit(scanner.getBatch()); contextBuilder.setTimeLimit(timeScope, timeLimit); contextBuilder.setTrackMetrics(trackMetrics); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java index dc21628..7f1f066 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java @@ -104,7 +104,7 @@ public class ScannerContext { if (limitsToCopy != null) this.limits.copy(limitsToCopy); // Progress fields are initialized to 0 - progress = new LimitFields(0, LimitFields.DEFAULT_SCOPE, 0, LimitFields.DEFAULT_SCOPE, 0); + progress = new LimitFields(0, LimitFields.DEFAULT_SCOPE, 0, 0, LimitFields.DEFAULT_SCOPE, 0); this.keepProgress = keepProgress; this.scannerState = DEFAULT_STATE; @@ -150,9 +150,11 @@ public class ScannerContext { /** * Progress towards the size limit has been made. Increment internal tracking of size progress */ - void incrementSizeProgress(long size) { - long currentSize = progress.getSize(); - progress.setSize(currentSize + size); + void incrementSizeProgress(long dataSize, long heapSize) { + long currentSize = progress.getDataSize(); + progress.setDataSize(currentSize + dataSize); + long curHeapSize = progress.getHeapSize(); + progress.setHeapSize(curHeapSize + heapSize); } /** @@ -166,22 +168,27 @@ public class ScannerContext { return progress.getBatch(); } - long getSizeProgress() { - return progress.getSize(); + long getDataSizeProgress() { + return progress.getDataSize(); + } + + long getHeapSizeProgress() { + return progress.getHeapSize(); } long getTimeProgress() { return progress.getTime(); } - void setProgress(int batchProgress, long sizeProgress, long timeProgress) { + void setProgress(int batchProgress, long sizeProgress, long heapSizeProgress, long timeProgress) { setBatchProgress(batchProgress); - setSizeProgress(sizeProgress); + setSizeProgress(sizeProgress, heapSizeProgress); setTimeProgress(timeProgress); } - void setSizeProgress(long sizeProgress) { - progress.setSize(sizeProgress); + void setSizeProgress(long sizeProgress, long heapSizeProgress) { + progress.setDataSize(sizeProgress); + progress.setHeapSize(heapSizeProgress); } void setBatchProgress(int batchProgress) { @@ -197,7 +204,7 @@ public class ScannerContext { * values */ void clearProgress() { - progress.setFields(0, LimitFields.DEFAULT_SCOPE, 0, LimitFields.DEFAULT_SCOPE, 0); + progress.setFields(0, LimitFields.DEFAULT_SCOPE, 0, 0, LimitFields.DEFAULT_SCOPE, 0); } /** @@ -240,7 +247,8 @@ public class ScannerContext { * @return true if the size limit can be enforced in the checker's scope */ boolean hasSizeLimit(LimitScope checkerScope) { - return limits.canEnforceSizeLimitFromScope(checkerScope) && limits.getSize() > 0; + return limits.canEnforceSizeLimitFromScope(checkerScope) + && (limits.getDataSize() > 0 || limits.getHeapSize() > 0); } /** @@ -277,8 +285,8 @@ public class ScannerContext { return limits.getBatch(); } - long getSizeLimit() { - return limits.getSize(); + long getDataSizeLimit() { + return limits.getDataSize(); } long getTimeLimit() { @@ -298,7 +306,8 @@ public class ScannerContext { * @return true when the limit is enforceable from the checker's scope and it has been reached */ boolean checkSizeLimit(LimitScope checkerScope) { - return hasSizeLimit(checkerScope) && progress.getSize() >= limits.getSize(); + return hasSizeLimit(checkerScope) && (progress.getDataSize() >= limits.getDataSize() + || progress.getHeapSize() >= limits.getHeapSize()); } /** @@ -370,9 +379,10 @@ public class ScannerContext { return this; } - public Builder setSizeLimit(LimitScope sizeScope, long sizeLimit) { - limits.setSize(sizeLimit); + public Builder setSizeLimit(LimitScope sizeScope, long sizeLimit, long heapSizeLimit) { + limits.setDataSize(sizeLimit); limits.setSizeScope(sizeScope); + limits.setHeapSize(heapSizeLimit); return this; } @@ -515,7 +525,8 @@ public class ScannerContext { int batch = DEFAULT_BATCH; LimitScope sizeScope = DEFAULT_SCOPE; - long size = DEFAULT_SIZE; + long dataSize = DEFAULT_SIZE; // The sum of cell sizes(key + value) + long heapSize = DEFAULT_SIZE; LimitScope timeScope = DEFAULT_SCOPE; long time = DEFAULT_TIME; @@ -526,14 +537,15 @@ public class ScannerContext { LimitFields() { } - LimitFields(int batch, LimitScope sizeScope, long size, LimitScope timeScope, long time) { - setFields(batch, sizeScope, size, timeScope, time); + LimitFields(int batch, LimitScope sizeScope, long size, long heapSize, LimitScope timeScope, + long time) { + setFields(batch, sizeScope, size, heapSize, timeScope, time); } void copy(LimitFields limitsToCopy) { if (limitsToCopy != null) { - setFields(limitsToCopy.getBatch(), limitsToCopy.getSizeScope(), limitsToCopy.getSize(), - limitsToCopy.getTimeScope(), limitsToCopy.getTime()); + setFields(limitsToCopy.getBatch(), limitsToCopy.getSizeScope(), limitsToCopy.getDataSize(), + limitsToCopy.getHeapSize(), limitsToCopy.getTimeScope(), limitsToCopy.getTime()); } } @@ -543,10 +555,12 @@ public class ScannerContext { * @param sizeScope * @param size */ - void setFields(int batch, LimitScope sizeScope, long size, LimitScope timeScope, long time) { + void setFields(int batch, LimitScope sizeScope, long size, long heapSize, LimitScope timeScope, + long time) { setBatch(batch); setSizeScope(sizeScope); - setSize(size); + setDataSize(size); + setHeapSize(heapSize); setTimeScope(timeScope); setTime(time); } @@ -567,12 +581,20 @@ public class ScannerContext { return LimitScope.BETWEEN_CELLS.canEnforceLimitFromScope(checkerScope); } - long getSize() { - return this.size; + long getDataSize() { + return this.dataSize; + } + + long getHeapSize() { + return this.heapSize; } - void setSize(long size) { - this.size = size; + void setDataSize(long size) { + this.dataSize = size; + } + + void setHeapSize(long heapSize) { + this.heapSize = heapSize; } /** @@ -635,8 +657,11 @@ public class ScannerContext { sb.append("batch:"); sb.append(batch); - sb.append(", size:"); - sb.append(size); + sb.append(", dataSize:"); + sb.append(dataSize); + + sb.append(", heapSize:"); + sb.append(heapSize); sb.append(", sizeScope:"); sb.append(sizeScope); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 8c48aef..5c21a41 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -603,10 +603,11 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner // Update local tracking information count++; - totalBytesRead += CellUtil.estimatedSerializedSizeOf(cell); + int cellSize = CellUtil.estimatedSerializedSizeOf(cell); + totalBytesRead += cellSize; // Update the progress of the scanner context - scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOf(cell)); + scannerContext.incrementSizeProgress(cellSize, CellUtil.estimatedHeapSizeOf(cell)); scannerContext.incrementBatchProgress(1); if (matcher.isUserScan() && totalBytesRead > maxRowSize) {