From 78c767df1b6dbe270140f1509bbc0ec09929e26c Mon Sep 17 00:00:00 2001 From: mbautin Date: Fri, 13 Apr 2012 19:38:57 -0700 Subject: [PATCH] [jira] [HBASE-5104] Provide a reliable intra-row pagination mechanism Summary: Porting Madhu's patch for intra-row pagination (rHBASEEIGHTNINEFBBRANCH1326043) to trunk. This is what we have in 89-fb just as a starting point (currently there are test failures). Test Plan: Run unit tests Reviewers: madhuvaidya, lhofhansl, Kannan, tedyu, stack, todd, JIRA CC: jxcn01 Differential Revision: https://reviews.facebook.net/D2799 --- .../java/org/apache/hadoop/hbase/client/Get.java | 64 +++- .../java/org/apache/hadoop/hbase/client/Scan.java | 86 ++++- .../apache/hadoop/hbase/protobuf/ProtobufUtil.java | 24 + .../hbase/protobuf/generated/ClientProtos.java | 432 +++++++++++++++---- .../hadoop/hbase/regionserver/StoreScanner.java | 33 ++- hbase-server/src/main/protobuf/Client.proto | 4 + .../java/org/apache/hadoop/hbase/HTestConst.java | 27 ++- .../hbase/client/TestIntraRowPagination.java | 108 +++++ .../hbase/client/TestScannersFromClientSide.java | 457 ++++++++++++++++++++ 9 files changed, 1140 insertions(+), 95 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIntraRowPagination.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/client/Get.java hbase-server/src/main/java/org/apache/hadoop/hbase/client/Get.java index cabe137..ef1178e 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/client/Get.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/client/Get.java @@ -69,17 +69,29 @@ import java.util.TreeSet; @InterfaceStability.Stable public class Get extends OperationWithAttributes implements Writable, Row, Comparable { - private static final byte GET_VERSION = (byte)2; + + private static final byte VERSION_WITHOUT_PAGINATION = (byte) 2; + private static final byte VERSION_WITH_PAGINATION = (byte) 3; + private static final byte GET_VERSION = VERSION_WITH_PAGINATION; private byte [] row = null; private long lockId = -1L; private int maxVersions = 1; private boolean cacheBlocks = true; + private int storeLimit = -1; + private int storeOffset = 0; private Filter filter = null; private TimeRange tr = new TimeRange(); private Map> familyMap = new TreeMap>(Bytes.BYTES_COMPARATOR); + private byte getVersion() { + if (storeLimit != -1 || storeOffset != 0) { + return VERSION_WITH_PAGINATION; + } + return VERSION_WITHOUT_PAGINATION; + } + /** Constructor for Writable. DO NOT USE */ public Get() {} @@ -194,6 +206,27 @@ public class Get extends OperationWithAttributes } /** + * Set the maximum number of values to return per row per Column Family + * @param limit the maximum number of values returned / row / CF + * @return this for invocation chaining + */ + public Get setMaxResultsPerColumnFamily(int limit) { + this.storeLimit = limit; + return this; + } + + /** + * Set offset for the row per Column Family. This offset is only within a particular row/CF + * combination. It gets reset back to zero when we move to the next row or CF. + * @param offset is the number of kvs that will be skipped. + * @return this for invocation chaining + */ + public Get setRowOffsetPerColumnFamily(int offset) { + this.storeOffset = offset; + return this; + } + + /** * Apply the specified server-side filter when performing the Get. * Only {@link Filter#filterKeyValue(KeyValue)} is called AFTER all tests * for ttl, column match, deletes and max versions have been run. @@ -270,6 +303,24 @@ public class Get extends OperationWithAttributes } /** + * Method for retrieving the get's maximum number of values + * to return per Column Family + * @return the maximum number of values to fetch per CF + */ + public int getMaxResultsPerColumnFamily() { + return this.storeLimit; + } + + /** + * Method for retrieving the get's offset per row per column + * family (#kvs to be skipped) + * @return the row offset + */ + public int getRowOffsetPerColumnFamily() { + return this.storeOffset; + } + + /** * Method for retrieving the get's TimeRange * @return timeRange */ @@ -399,6 +450,10 @@ public class Get extends OperationWithAttributes this.row = Bytes.readByteArray(in); this.lockId = in.readLong(); this.maxVersions = in.readInt(); + if (version >= VERSION_WITH_PAGINATION) { + this.storeLimit = in.readInt(); + this.storeOffset = in.readInt(); + } boolean hasFilter = in.readBoolean(); if (hasFilter) { this.filter = (Filter)createForName(Bytes.toString(Bytes.readByteArray(in))); @@ -429,10 +484,15 @@ public class Get extends OperationWithAttributes public void write(final DataOutput out) throws IOException { - out.writeByte(GET_VERSION); + byte version = getVersion(); + out.writeByte(version); Bytes.writeByteArray(out, this.row); out.writeLong(this.lockId); out.writeInt(this.maxVersions); + if (version >= VERSION_WITH_PAGINATION) { + out.writeInt(this.storeLimit); + out.writeInt(this.storeOffset); + } if(this.filter == null) { out.writeBoolean(false); } else { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/client/Scan.java hbase-server/src/main/java/org/apache/hadoop/hbase/client/Scan.java index e721627..d13ed82 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/client/Scan.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/client/Scan.java @@ -91,11 +91,20 @@ public class Scan extends OperationWithAttributes implements Writable { private static final String RAW_ATTR = "_raw_"; private static final String ISOLATION_LEVEL = "_isolationlevel_"; - private static final byte SCAN_VERSION = (byte)3; + private static final byte VERSION_WITH_PAGINATION = (byte)4; + private static final byte VERSION_WITH_RESULT_SIZE = (byte)3; + private static final byte VERSION_WITH_ATTRIBUTES= (byte)2; + + private static final byte SCAN_VERSION = VERSION_WITH_PAGINATION; + private byte [] startRow = HConstants.EMPTY_START_ROW; private byte [] stopRow = HConstants.EMPTY_END_ROW; private int maxVersions = 1; private int batch = -1; + + private int storeLimit = -1; + private int storeOffset = 0; + // If application wants to collect scan metrics, it needs to // call scan.setAttribute(SCAN_ATTRIBUTES_ENABLE, Bytes.toBytes(Boolean.TRUE)) static public String SCAN_ATTRIBUTES_METRICS_ENABLE = @@ -115,6 +124,22 @@ public class Scan extends OperationWithAttributes implements Writable { new TreeMap>(Bytes.BYTES_COMPARATOR); /** + * @return the lowest possible version for this scan + */ + private byte getVersion() { + if (storeLimit != -1 || storeOffset != 0) { + return VERSION_WITH_PAGINATION; + } + if (maxResultSize != -1) { + return VERSION_WITH_RESULT_SIZE; + } + if (getAttributeSize() != 0) { + return VERSION_WITH_ATTRIBUTES; + } + return 1; + } + + /** * Create a Scan operation across all rows. */ public Scan() {} @@ -156,6 +181,8 @@ public class Scan extends OperationWithAttributes implements Writable { stopRow = scan.getStopRow(); maxVersions = scan.getMaxVersions(); batch = scan.getBatch(); + storeLimit = scan.getMaxResultsPerColumnFamily(); + storeOffset = scan.getRowOffsetPerColumnFamily(); caching = scan.getCaching(); maxResultSize = scan.getMaxResultSize(); cacheBlocks = scan.getCacheBlocks(); @@ -189,6 +216,8 @@ public class Scan extends OperationWithAttributes implements Writable { this.filter = get.getFilter(); this.cacheBlocks = get.getCacheBlocks(); this.maxVersions = get.getMaxVersions(); + this.storeLimit = get.getMaxResultsPerColumnFamily(); + this.storeOffset = get.getRowOffsetPerColumnFamily(); this.tr = get.getTimeRange(); this.familyMap = get.getFamilyMap(); } @@ -324,6 +353,22 @@ public class Scan extends OperationWithAttributes implements Writable { } /** + * Set the maximum number of values to return per row per Column Family + * @param limit the maximum number of values returned / row / CF + */ + public void setMaxResultsPerColumnFamily(int limit) { + this.storeLimit = limit; + } + + /** + * Set offset for the row per Column Family. + * @param offset is the number of kvs that will be skipped. + */ + public void setRowOffsetPerColumnFamily(int offset) { + this.storeOffset = offset; + } + + /** * Set the number of rows for caching that will be passed to scanners. * If not set, the default setting from {@link HTable#getScannerCaching()} will apply. * Higher caching values will enable faster scanners but will use more memory. @@ -435,6 +480,22 @@ public class Scan extends OperationWithAttributes implements Writable { } /** + * @return maximum number of values to return per row per CF + */ + public int getMaxResultsPerColumnFamily() { + return this.storeLimit; + } + + /** + * Method for retrieving the scan's offset per row per column + * family (#kvs to be skipped) + * @return row offset + */ + public int getRowOffsetPerColumnFamily() { + return this.storeOffset; + } + + /** * @return caching the number of rows fetched when calling next on a scanner */ public int getCaching() { @@ -591,6 +652,10 @@ public class Scan extends OperationWithAttributes implements Writable { this.stopRow = Bytes.readByteArray(in); this.maxVersions = in.readInt(); this.batch = in.readInt(); + if (version >= VERSION_WITH_PAGINATION) { + this.storeLimit = in.readInt(); + this.storeOffset = in.readInt(); + } this.caching = in.readInt(); this.cacheBlocks = in.readBoolean(); if(in.readBoolean()) { @@ -613,21 +678,26 @@ public class Scan extends OperationWithAttributes implements Writable { this.familyMap.put(family, set); } - if (version > 1) { + if (version >= VERSION_WITH_ATTRIBUTES) { readAttributes(in); } - if (version > 2) { + if (version >= VERSION_WITH_RESULT_SIZE) { this.maxResultSize = in.readLong(); } } public void write(final DataOutput out) throws IOException { - out.writeByte(SCAN_VERSION); + byte version = getVersion(); + out.writeByte(version); Bytes.writeByteArray(out, this.startRow); Bytes.writeByteArray(out, this.stopRow); out.writeInt(this.maxVersions); out.writeInt(this.batch); + if (version >= VERSION_WITH_PAGINATION) { + out.writeInt(this.storeLimit); + out.writeInt(this.storeOffset); + } out.writeInt(this.caching); out.writeBoolean(this.cacheBlocks); if(this.filter == null) { @@ -651,8 +721,12 @@ public class Scan extends OperationWithAttributes implements Writable { out.writeInt(0); } } - writeAttributes(out); - out.writeLong(maxResultSize); + if (version >= VERSION_WITH_ATTRIBUTES) { + writeAttributes(out); + } + if (version >= VERSION_WITH_RESULT_SIZE) { + out.writeLong(maxResultSize); + } } /** diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index 3bfaf9b..c808f61 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -345,6 +345,12 @@ public final class ProtobufUtil { if (proto.hasMaxVersions()) { get.setMaxVersions(proto.getMaxVersions()); } + if (proto.hasStoreLimit()) { + get.setMaxResultsPerColumnFamily(proto.getStoreLimit()); + } + if (proto.hasStoreOffset()) { + get.setRowOffsetPerColumnFamily(proto.getStoreOffset()); + } if (proto.hasTimeRange()) { HBaseProtos.TimeRange timeRange = proto.getTimeRange(); long minStamp = 0; @@ -612,6 +618,12 @@ public final class ProtobufUtil { } scanBuilder.addColumn(columnBuilder.build()); } + if (scan.getMaxResultsPerColumnFamily() >= 0) { + scanBuilder.setStoreLimit(scan.getMaxResultsPerColumnFamily()); + } + if (scan.getRowOffsetPerColumnFamily() > 0) { + scanBuilder.setStoreOffset(scan.getRowOffsetPerColumnFamily()); + } return scanBuilder.build(); } @@ -639,6 +651,12 @@ public final class ProtobufUtil { if (proto.hasMaxVersions()) { scan.setMaxVersions(proto.getMaxVersions()); } + if (proto.hasStoreLimit()) { + scan.setMaxResultsPerColumnFamily(proto.getStoreLimit()); + } + if (proto.hasStoreOffset()) { + scan.setRowOffsetPerColumnFamily(proto.getStoreOffset()); + } if (proto.hasTimeRange()) { HBaseProtos.TimeRange timeRange = proto.getTimeRange(); long minStamp = 0; @@ -770,6 +788,12 @@ public final class ProtobufUtil { builder.addColumn(columnBuilder.build()); } } + if (get.getMaxResultsPerColumnFamily() >= 0) { + builder.setStoreLimit(get.getMaxResultsPerColumnFamily()); + } + if (get.getRowOffsetPerColumnFamily() > 0) { + builder.setStoreOffset(get.getRowOffsetPerColumnFamily()); + } return builder.build(); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java index 36b7258..0153bc9 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java @@ -549,6 +549,14 @@ public final class ClientProtos { // optional bool cacheBlocks = 8 [default = true]; boolean hasCacheBlocks(); boolean getCacheBlocks(); + + // optional uint32 storeLimit = 9; + boolean hasStoreLimit(); + int getStoreLimit(); + + // optional uint32 storeOffset = 10; + boolean hasStoreOffset(); + int getStoreOffset(); } public static final class Get extends com.google.protobuf.GeneratedMessage @@ -687,6 +695,26 @@ public final class ClientProtos { return cacheBlocks_; } + // optional uint32 storeLimit = 9; + public static final int STORELIMIT_FIELD_NUMBER = 9; + private int storeLimit_; + public boolean hasStoreLimit() { + return ((bitField0_ & 0x00000040) == 0x00000040); + } + public int getStoreLimit() { + return storeLimit_; + } + + // optional uint32 storeOffset = 10; + public static final int STOREOFFSET_FIELD_NUMBER = 10; + private int storeOffset_; + public boolean hasStoreOffset() { + return ((bitField0_ & 0x00000080) == 0x00000080); + } + public int getStoreOffset() { + return storeOffset_; + } + private void initFields() { row_ = com.google.protobuf.ByteString.EMPTY; column_ = java.util.Collections.emptyList(); @@ -696,6 +724,8 @@ public final class ClientProtos { timeRange_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TimeRange.getDefaultInstance(); maxVersions_ = 1; cacheBlocks_ = true; + storeLimit_ = 0; + storeOffset_ = 0; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -755,6 +785,12 @@ public final class ClientProtos { if (((bitField0_ & 0x00000020) == 0x00000020)) { output.writeBool(8, cacheBlocks_); } + if (((bitField0_ & 0x00000040) == 0x00000040)) { + output.writeUInt32(9, storeLimit_); + } + if (((bitField0_ & 0x00000080) == 0x00000080)) { + output.writeUInt32(10, storeOffset_); + } getUnknownFields().writeTo(output); } @@ -796,6 +832,14 @@ public final class ClientProtos { size += com.google.protobuf.CodedOutputStream .computeBoolSize(8, cacheBlocks_); } + if (((bitField0_ & 0x00000040) == 0x00000040)) { + size += com.google.protobuf.CodedOutputStream + .computeUInt32Size(9, storeLimit_); + } + if (((bitField0_ & 0x00000080) == 0x00000080)) { + size += com.google.protobuf.CodedOutputStream + .computeUInt32Size(10, storeOffset_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -853,6 +897,16 @@ public final class ClientProtos { result = result && (getCacheBlocks() == other.getCacheBlocks()); } + result = result && (hasStoreLimit() == other.hasStoreLimit()); + if (hasStoreLimit()) { + result = result && (getStoreLimit() + == other.getStoreLimit()); + } + result = result && (hasStoreOffset() == other.hasStoreOffset()); + if (hasStoreOffset()) { + result = result && (getStoreOffset() + == other.getStoreOffset()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -894,6 +948,14 @@ public final class ClientProtos { hash = (37 * hash) + CACHEBLOCKS_FIELD_NUMBER; hash = (53 * hash) + hashBoolean(getCacheBlocks()); } + if (hasStoreLimit()) { + hash = (37 * hash) + STORELIMIT_FIELD_NUMBER; + hash = (53 * hash) + getStoreLimit(); + } + if (hasStoreOffset()) { + hash = (37 * hash) + STOREOFFSET_FIELD_NUMBER; + hash = (53 * hash) + getStoreOffset(); + } hash = (29 * hash) + getUnknownFields().hashCode(); return hash; } @@ -1046,6 +1108,10 @@ public final class ClientProtos { bitField0_ = (bitField0_ & ~0x00000040); cacheBlocks_ = true; bitField0_ = (bitField0_ & ~0x00000080); + storeLimit_ = 0; + bitField0_ = (bitField0_ & ~0x00000100); + storeOffset_ = 0; + bitField0_ = (bitField0_ & ~0x00000200); return this; } @@ -1134,6 +1200,14 @@ public final class ClientProtos { to_bitField0_ |= 0x00000020; } result.cacheBlocks_ = cacheBlocks_; + if (((from_bitField0_ & 0x00000100) == 0x00000100)) { + to_bitField0_ |= 0x00000040; + } + result.storeLimit_ = storeLimit_; + if (((from_bitField0_ & 0x00000200) == 0x00000200)) { + to_bitField0_ |= 0x00000080; + } + result.storeOffset_ = storeOffset_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -1220,6 +1294,12 @@ public final class ClientProtos { if (other.hasCacheBlocks()) { setCacheBlocks(other.getCacheBlocks()); } + if (other.hasStoreLimit()) { + setStoreLimit(other.getStoreLimit()); + } + if (other.hasStoreOffset()) { + setStoreOffset(other.getStoreOffset()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -1323,6 +1403,16 @@ public final class ClientProtos { cacheBlocks_ = input.readBool(); break; } + case 72: { + bitField0_ |= 0x00000100; + storeLimit_ = input.readUInt32(); + break; + } + case 80: { + bitField0_ |= 0x00000200; + storeOffset_ = input.readUInt32(); + break; + } } } } @@ -1968,6 +2058,48 @@ public final class ClientProtos { return this; } + // optional uint32 storeLimit = 9; + private int storeLimit_ ; + public boolean hasStoreLimit() { + return ((bitField0_ & 0x00000100) == 0x00000100); + } + public int getStoreLimit() { + return storeLimit_; + } + public Builder setStoreLimit(int value) { + bitField0_ |= 0x00000100; + storeLimit_ = value; + onChanged(); + return this; + } + public Builder clearStoreLimit() { + bitField0_ = (bitField0_ & ~0x00000100); + storeLimit_ = 0; + onChanged(); + return this; + } + + // optional uint32 storeOffset = 10; + private int storeOffset_ ; + public boolean hasStoreOffset() { + return ((bitField0_ & 0x00000200) == 0x00000200); + } + public int getStoreOffset() { + return storeOffset_; + } + public Builder setStoreOffset(int value) { + bitField0_ |= 0x00000200; + storeOffset_ = value; + onChanged(); + return this; + } + public Builder clearStoreOffset() { + bitField0_ = (bitField0_ & ~0x00000200); + storeOffset_ = 0; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:Get) } @@ -8732,6 +8864,14 @@ public final class ClientProtos { // optional uint64 maxResultSize = 10; boolean hasMaxResultSize(); long getMaxResultSize(); + + // optional uint32 storeLimit = 11; + boolean hasStoreLimit(); + int getStoreLimit(); + + // optional uint32 storeOffset = 12; + boolean hasStoreOffset(); + int getStoreOffset(); } public static final class Scan extends com.google.protobuf.GeneratedMessage @@ -8890,6 +9030,26 @@ public final class ClientProtos { return maxResultSize_; } + // optional uint32 storeLimit = 11; + public static final int STORELIMIT_FIELD_NUMBER = 11; + private int storeLimit_; + public boolean hasStoreLimit() { + return ((bitField0_ & 0x00000100) == 0x00000100); + } + public int getStoreLimit() { + return storeLimit_; + } + + // optional uint32 storeOffset = 12; + public static final int STOREOFFSET_FIELD_NUMBER = 12; + private int storeOffset_; + public boolean hasStoreOffset() { + return ((bitField0_ & 0x00000200) == 0x00000200); + } + public int getStoreOffset() { + return storeOffset_; + } + private void initFields() { column_ = java.util.Collections.emptyList(); attribute_ = java.util.Collections.emptyList(); @@ -8901,6 +9061,8 @@ public final class ClientProtos { cacheBlocks_ = true; batchSize_ = 0; maxResultSize_ = 0L; + storeLimit_ = 0; + storeOffset_ = 0; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -8962,6 +9124,12 @@ public final class ClientProtos { if (((bitField0_ & 0x00000080) == 0x00000080)) { output.writeUInt64(10, maxResultSize_); } + if (((bitField0_ & 0x00000100) == 0x00000100)) { + output.writeUInt32(11, storeLimit_); + } + if (((bitField0_ & 0x00000200) == 0x00000200)) { + output.writeUInt32(12, storeOffset_); + } getUnknownFields().writeTo(output); } @@ -9011,6 +9179,14 @@ public final class ClientProtos { size += com.google.protobuf.CodedOutputStream .computeUInt64Size(10, maxResultSize_); } + if (((bitField0_ & 0x00000100) == 0x00000100)) { + size += com.google.protobuf.CodedOutputStream + .computeUInt32Size(11, storeLimit_); + } + if (((bitField0_ & 0x00000200) == 0x00000200)) { + size += com.google.protobuf.CodedOutputStream + .computeUInt32Size(12, storeOffset_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -9078,6 +9254,16 @@ public final class ClientProtos { result = result && (getMaxResultSize() == other.getMaxResultSize()); } + result = result && (hasStoreLimit() == other.hasStoreLimit()); + if (hasStoreLimit()) { + result = result && (getStoreLimit() + == other.getStoreLimit()); + } + result = result && (hasStoreOffset() == other.hasStoreOffset()); + if (hasStoreOffset()) { + result = result && (getStoreOffset() + == other.getStoreOffset()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -9127,6 +9313,14 @@ public final class ClientProtos { hash = (37 * hash) + MAXRESULTSIZE_FIELD_NUMBER; hash = (53 * hash) + hashLong(getMaxResultSize()); } + if (hasStoreLimit()) { + hash = (37 * hash) + STORELIMIT_FIELD_NUMBER; + hash = (53 * hash) + getStoreLimit(); + } + if (hasStoreOffset()) { + hash = (37 * hash) + STOREOFFSET_FIELD_NUMBER; + hash = (53 * hash) + getStoreOffset(); + } hash = (29 * hash) + getUnknownFields().hashCode(); return hash; } @@ -9283,6 +9477,10 @@ public final class ClientProtos { bitField0_ = (bitField0_ & ~0x00000100); maxResultSize_ = 0L; bitField0_ = (bitField0_ & ~0x00000200); + storeLimit_ = 0; + bitField0_ = (bitField0_ & ~0x00000400); + storeOffset_ = 0; + bitField0_ = (bitField0_ & ~0x00000800); return this; } @@ -9379,6 +9577,14 @@ public final class ClientProtos { to_bitField0_ |= 0x00000080; } result.maxResultSize_ = maxResultSize_; + if (((from_bitField0_ & 0x00000400) == 0x00000400)) { + to_bitField0_ |= 0x00000100; + } + result.storeLimit_ = storeLimit_; + if (((from_bitField0_ & 0x00000800) == 0x00000800)) { + to_bitField0_ |= 0x00000200; + } + result.storeOffset_ = storeOffset_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -9471,6 +9677,12 @@ public final class ClientProtos { if (other.hasMaxResultSize()) { setMaxResultSize(other.getMaxResultSize()); } + if (other.hasStoreLimit()) { + setStoreLimit(other.getStoreLimit()); + } + if (other.hasStoreOffset()) { + setStoreOffset(other.getStoreOffset()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -9580,6 +9792,16 @@ public final class ClientProtos { maxResultSize_ = input.readUInt64(); break; } + case 88: { + bitField0_ |= 0x00000400; + storeLimit_ = input.readUInt32(); + break; + } + case 96: { + bitField0_ |= 0x00000800; + storeOffset_ = input.readUInt32(); + break; + } } } } @@ -10270,6 +10492,48 @@ public final class ClientProtos { return this; } + // optional uint32 storeLimit = 11; + private int storeLimit_ ; + public boolean hasStoreLimit() { + return ((bitField0_ & 0x00000400) == 0x00000400); + } + public int getStoreLimit() { + return storeLimit_; + } + public Builder setStoreLimit(int value) { + bitField0_ |= 0x00000400; + storeLimit_ = value; + onChanged(); + return this; + } + public Builder clearStoreLimit() { + bitField0_ = (bitField0_ & ~0x00000400); + storeLimit_ = 0; + onChanged(); + return this; + } + + // optional uint32 storeOffset = 12; + private int storeOffset_ ; + public boolean hasStoreOffset() { + return ((bitField0_ & 0x00000800) == 0x00000800); + } + public int getStoreOffset() { + return storeOffset_; + } + public Builder setStoreOffset(int value) { + bitField0_ |= 0x00000800; + storeOffset_ = value; + onChanged(); + return this; + } + public Builder clearStoreOffset() { + bitField0_ = (bitField0_ & ~0x00000800); + storeOffset_ = 0; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:Scan) } @@ -21385,92 +21649,94 @@ public final class ClientProtos { static { java.lang.String[] descriptorData = { "\n\014Client.proto\032\013hbase.proto\"+\n\006Column\022\016\n" + - "\006family\030\001 \002(\014\022\021\n\tqualifier\030\002 \003(\014\"\320\001\n\003Get" + + "\006family\030\001 \002(\014\022\021\n\tqualifier\030\002 \003(\014\"\371\001\n\003Get" + "\022\013\n\003row\030\001 \002(\014\022\027\n\006column\030\002 \003(\0132\007.Column\022!" + "\n\tattribute\030\003 \003(\0132\016.NameBytesPair\022\016\n\006loc" + "kId\030\004 \001(\004\022\036\n\006filter\030\005 \001(\0132\016.NameBytesPai" + "r\022\035\n\ttimeRange\030\006 \001(\0132\n.TimeRange\022\026\n\013maxV" + "ersions\030\007 \001(\r:\0011\022\031\n\013cacheBlocks\030\010 \001(\010:\004t" + - "rue\"\037\n\006Result\022\025\n\rkeyValueBytes\030\001 \003(\014\"r\n\n" + - "GetRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpeci" + - "fier\022\021\n\003get\030\002 \002(\0132\004.Get\022\030\n\020closestRowBef", - "ore\030\003 \001(\010\022\025\n\rexistenceOnly\030\004 \001(\010\"6\n\013GetR" + - "esponse\022\027\n\006result\030\001 \001(\0132\007.Result\022\016\n\006exis" + - "ts\030\002 \001(\010\"\200\002\n\tCondition\022\013\n\003row\030\001 \002(\014\022\016\n\006f" + - "amily\030\002 \002(\014\022\021\n\tqualifier\030\003 \002(\014\022+\n\013compar" + - "eType\030\004 \002(\0162\026.Condition.CompareType\022\"\n\nc" + - "omparator\030\005 \002(\0132\016.NameBytesPair\"r\n\013Compa" + - "reType\022\010\n\004LESS\020\000\022\021\n\rLESS_OR_EQUAL\020\001\022\t\n\005E" + - "QUAL\020\002\022\r\n\tNOT_EQUAL\020\003\022\024\n\020GREATER_OR_EQUA" + - "L\020\004\022\013\n\007GREATER\020\005\022\t\n\005NO_OP\020\006\"\306\004\n\006Mutate\022\013" + - "\n\003row\030\001 \002(\014\022&\n\nmutateType\030\002 \002(\0162\022.Mutate", - ".MutateType\022(\n\013columnValue\030\003 \003(\0132\023.Mutat" + - "e.ColumnValue\022!\n\tattribute\030\004 \003(\0132\016.NameB" + - "ytesPair\022\021\n\ttimestamp\030\005 \001(\004\022\016\n\006lockId\030\006 " + - "\001(\004\022\030\n\nwriteToWAL\030\007 \001(\010:\004true\022\035\n\ttimeRan" + - "ge\030\n \001(\0132\n.TimeRange\032\310\001\n\013ColumnValue\022\016\n\006" + - "family\030\001 \002(\014\022:\n\016qualifierValue\030\002 \003(\0132\".M" + - "utate.ColumnValue.QualifierValue\032m\n\016Qual" + - "ifierValue\022\021\n\tqualifier\030\001 \001(\014\022\r\n\005value\030\002" + - " \001(\014\022\021\n\ttimestamp\030\003 \001(\004\022&\n\ndeleteType\030\004 " + - "\001(\0162\022.Mutate.DeleteType\"<\n\nMutateType\022\n\n", - "\006APPEND\020\000\022\r\n\tINCREMENT\020\001\022\007\n\003PUT\020\002\022\n\n\006DEL" + - "ETE\020\003\"U\n\nDeleteType\022\026\n\022DELETE_ONE_VERSIO" + - "N\020\000\022\034\n\030DELETE_MULTIPLE_VERSIONS\020\001\022\021\n\rDEL" + - "ETE_FAMILY\020\002\"i\n\rMutateRequest\022 \n\006region\030" + - "\001 \002(\0132\020.RegionSpecifier\022\027\n\006mutate\030\002 \002(\0132" + - "\007.Mutate\022\035\n\tcondition\030\003 \001(\0132\n.Condition\"" + - "<\n\016MutateResponse\022\027\n\006result\030\001 \001(\0132\007.Resu" + - "lt\022\021\n\tprocessed\030\002 \001(\010\"\201\002\n\004Scan\022\027\n\006column" + - "\030\001 \003(\0132\007.Column\022!\n\tattribute\030\002 \003(\0132\016.Nam" + - "eBytesPair\022\020\n\010startRow\030\003 \001(\014\022\017\n\007stopRow\030", - "\004 \001(\014\022\036\n\006filter\030\005 \001(\0132\016.NameBytesPair\022\035\n" + - "\ttimeRange\030\006 \001(\0132\n.TimeRange\022\026\n\013maxVersi" + - "ons\030\007 \001(\r:\0011\022\031\n\013cacheBlocks\030\010 \001(\010:\004true\022" + - "\021\n\tbatchSize\030\t \001(\r\022\025\n\rmaxResultSize\030\n \001(" + - "\004\"\203\001\n\013ScanRequest\022 \n\006region\030\001 \001(\0132\020.Regi" + - "onSpecifier\022\023\n\004scan\030\002 \001(\0132\005.Scan\022\021\n\tscan" + - "nerId\030\003 \001(\004\022\024\n\014numberOfRows\030\004 \001(\r\022\024\n\014clo" + - "seScanner\030\005 \001(\010\"\\\n\014ScanResponse\022\027\n\006resul" + - "t\030\001 \003(\0132\007.Result\022\021\n\tscannerId\030\002 \001(\004\022\023\n\013m" + - "oreResults\030\003 \001(\010\022\013\n\003ttl\030\004 \001(\r\"?\n\016LockRow", - "Request\022 \n\006region\030\001 \002(\0132\020.RegionSpecifie" + - "r\022\013\n\003row\030\002 \003(\014\".\n\017LockRowResponse\022\016\n\006loc" + - "kId\030\001 \002(\004\022\013\n\003ttl\030\002 \001(\r\"D\n\020UnlockRowReque" + - "st\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022\016\n\006" + - "lockId\030\002 \002(\004\"\023\n\021UnlockRowResponse\"\232\001\n\024Bu" + - "lkLoadHFileRequest\022 \n\006region\030\001 \002(\0132\020.Reg" + - "ionSpecifier\0224\n\nfamilyPath\030\002 \003(\0132 .BulkL" + - "oadHFileRequest.FamilyPath\032*\n\nFamilyPath" + - "\022\016\n\006family\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLo" + - "adHFileResponse\022\016\n\006loaded\030\001 \002(\010\"\203\001\n\004Exec", - "\022\013\n\003row\030\001 \002(\014\022\024\n\014protocolName\030\002 \002(\t\022\022\n\nm" + - "ethodName\030\003 \002(\t\022!\n\010property\030\004 \003(\0132\017.Name" + - "StringPair\022!\n\tparameter\030\005 \003(\0132\016.NameByte" + - "sPair\"O\n\026ExecCoprocessorRequest\022 \n\006regio" + - "n\030\001 \002(\0132\020.RegionSpecifier\022\023\n\004call\030\002 \002(\0132" + - "\005.Exec\"8\n\027ExecCoprocessorResponse\022\035\n\005val" + - "ue\030\001 \002(\0132\016.NameBytesPair\"N\n\013MultiAction\022" + - "\027\n\006mutate\030\001 \001(\0132\007.Mutate\022\021\n\003get\030\002 \001(\0132\004." + - "Get\022\023\n\004exec\030\003 \001(\0132\005.Exec\"P\n\014ActionResult" + - "\022\035\n\005value\030\001 \001(\0132\016.NameBytesPair\022!\n\texcep", - "tion\030\002 \001(\0132\016.NameBytesPair\"^\n\014MultiReque" + - "st\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022\034\n\006" + - "action\030\002 \003(\0132\014.MultiAction\022\016\n\006atomic\030\003 \001" + - "(\010\".\n\rMultiResponse\022\035\n\006result\030\001 \003(\0132\r.Ac" + - "tionResult2\221\003\n\rClientService\022 \n\003get\022\013.Ge" + - "tRequest\032\014.GetResponse\022)\n\006mutate\022\016.Mutat" + - "eRequest\032\017.MutateResponse\022#\n\004scan\022\014.Scan" + - "Request\032\r.ScanResponse\022,\n\007lockRow\022\017.Lock" + - "RowRequest\032\020.LockRowResponse\0222\n\tunlockRo" + - "w\022\021.UnlockRowRequest\032\022.UnlockRowResponse", - "\022>\n\rbulkLoadHFile\022\025.BulkLoadHFileRequest" + - "\032\026.BulkLoadHFileResponse\022D\n\017execCoproces" + - "sor\022\027.ExecCoprocessorRequest\032\030.ExecCopro" + - "cessorResponse\022&\n\005multi\022\r.MultiRequest\032\016" + - ".MultiResponseBB\n*org.apache.hadoop.hbas" + - "e.protobuf.generatedB\014ClientProtosH\001\210\001\001\240" + - "\001\001" + "rue\022\022\n\nstoreLimit\030\t \001(\r\022\023\n\013storeOffset\030\n" + + " \001(\r\"\037\n\006Result\022\025\n\rkeyValueBytes\030\001 \003(\014\"r\n" + + "\nGetRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpec", + "ifier\022\021\n\003get\030\002 \002(\0132\004.Get\022\030\n\020closestRowBe" + + "fore\030\003 \001(\010\022\025\n\rexistenceOnly\030\004 \001(\010\"6\n\013Get" + + "Response\022\027\n\006result\030\001 \001(\0132\007.Result\022\016\n\006exi" + + "sts\030\002 \001(\010\"\200\002\n\tCondition\022\013\n\003row\030\001 \002(\014\022\016\n\006" + + "family\030\002 \002(\014\022\021\n\tqualifier\030\003 \002(\014\022+\n\013compa" + + "reType\030\004 \002(\0162\026.Condition.CompareType\022\"\n\n" + + "comparator\030\005 \002(\0132\016.NameBytesPair\"r\n\013Comp" + + "areType\022\010\n\004LESS\020\000\022\021\n\rLESS_OR_EQUAL\020\001\022\t\n\005" + + "EQUAL\020\002\022\r\n\tNOT_EQUAL\020\003\022\024\n\020GREATER_OR_EQU" + + "AL\020\004\022\013\n\007GREATER\020\005\022\t\n\005NO_OP\020\006\"\306\004\n\006Mutate\022", + "\013\n\003row\030\001 \002(\014\022&\n\nmutateType\030\002 \002(\0162\022.Mutat" + + "e.MutateType\022(\n\013columnValue\030\003 \003(\0132\023.Muta" + + "te.ColumnValue\022!\n\tattribute\030\004 \003(\0132\016.Name" + + "BytesPair\022\021\n\ttimestamp\030\005 \001(\004\022\016\n\006lockId\030\006" + + " \001(\004\022\030\n\nwriteToWAL\030\007 \001(\010:\004true\022\035\n\ttimeRa" + + "nge\030\n \001(\0132\n.TimeRange\032\310\001\n\013ColumnValue\022\016\n" + + "\006family\030\001 \002(\014\022:\n\016qualifierValue\030\002 \003(\0132\"." + + "Mutate.ColumnValue.QualifierValue\032m\n\016Qua" + + "lifierValue\022\021\n\tqualifier\030\001 \001(\014\022\r\n\005value\030" + + "\002 \001(\014\022\021\n\ttimestamp\030\003 \001(\004\022&\n\ndeleteType\030\004", + " \001(\0162\022.Mutate.DeleteType\"<\n\nMutateType\022\n" + + "\n\006APPEND\020\000\022\r\n\tINCREMENT\020\001\022\007\n\003PUT\020\002\022\n\n\006DE" + + "LETE\020\003\"U\n\nDeleteType\022\026\n\022DELETE_ONE_VERSI" + + "ON\020\000\022\034\n\030DELETE_MULTIPLE_VERSIONS\020\001\022\021\n\rDE" + + "LETE_FAMILY\020\002\"i\n\rMutateRequest\022 \n\006region" + + "\030\001 \002(\0132\020.RegionSpecifier\022\027\n\006mutate\030\002 \002(\013" + + "2\007.Mutate\022\035\n\tcondition\030\003 \001(\0132\n.Condition" + + "\"<\n\016MutateResponse\022\027\n\006result\030\001 \001(\0132\007.Res" + + "ult\022\021\n\tprocessed\030\002 \001(\010\"\252\002\n\004Scan\022\027\n\006colum" + + "n\030\001 \003(\0132\007.Column\022!\n\tattribute\030\002 \003(\0132\016.Na", + "meBytesPair\022\020\n\010startRow\030\003 \001(\014\022\017\n\007stopRow" + + "\030\004 \001(\014\022\036\n\006filter\030\005 \001(\0132\016.NameBytesPair\022\035" + + "\n\ttimeRange\030\006 \001(\0132\n.TimeRange\022\026\n\013maxVers" + + "ions\030\007 \001(\r:\0011\022\031\n\013cacheBlocks\030\010 \001(\010:\004true" + + "\022\021\n\tbatchSize\030\t \001(\r\022\025\n\rmaxResultSize\030\n \001" + + "(\004\022\022\n\nstoreLimit\030\013 \001(\r\022\023\n\013storeOffset\030\014 " + + "\001(\r\"\203\001\n\013ScanRequest\022 \n\006region\030\001 \001(\0132\020.Re" + + "gionSpecifier\022\023\n\004scan\030\002 \001(\0132\005.Scan\022\021\n\tsc" + + "annerId\030\003 \001(\004\022\024\n\014numberOfRows\030\004 \001(\r\022\024\n\014c" + + "loseScanner\030\005 \001(\010\"\\\n\014ScanResponse\022\027\n\006res", + "ult\030\001 \003(\0132\007.Result\022\021\n\tscannerId\030\002 \001(\004\022\023\n" + + "\013moreResults\030\003 \001(\010\022\013\n\003ttl\030\004 \001(\r\"?\n\016LockR" + + "owRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpecif" + + "ier\022\013\n\003row\030\002 \003(\014\".\n\017LockRowResponse\022\016\n\006l" + + "ockId\030\001 \002(\004\022\013\n\003ttl\030\002 \001(\r\"D\n\020UnlockRowReq" + + "uest\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022\016" + + "\n\006lockId\030\002 \002(\004\"\023\n\021UnlockRowResponse\"\232\001\n\024" + + "BulkLoadHFileRequest\022 \n\006region\030\001 \002(\0132\020.R" + + "egionSpecifier\0224\n\nfamilyPath\030\002 \003(\0132 .Bul" + + "kLoadHFileRequest.FamilyPath\032*\n\nFamilyPa", + "th\022\016\n\006family\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025Bulk" + + "LoadHFileResponse\022\016\n\006loaded\030\001 \002(\010\"\203\001\n\004Ex" + + "ec\022\013\n\003row\030\001 \002(\014\022\024\n\014protocolName\030\002 \002(\t\022\022\n" + + "\nmethodName\030\003 \002(\t\022!\n\010property\030\004 \003(\0132\017.Na" + + "meStringPair\022!\n\tparameter\030\005 \003(\0132\016.NameBy" + + "tesPair\"O\n\026ExecCoprocessorRequest\022 \n\006reg" + + "ion\030\001 \002(\0132\020.RegionSpecifier\022\023\n\004call\030\002 \002(" + + "\0132\005.Exec\"8\n\027ExecCoprocessorResponse\022\035\n\005v" + + "alue\030\001 \002(\0132\016.NameBytesPair\"N\n\013MultiActio" + + "n\022\027\n\006mutate\030\001 \001(\0132\007.Mutate\022\021\n\003get\030\002 \001(\0132", + "\004.Get\022\023\n\004exec\030\003 \001(\0132\005.Exec\"P\n\014ActionResu" + + "lt\022\035\n\005value\030\001 \001(\0132\016.NameBytesPair\022!\n\texc" + + "eption\030\002 \001(\0132\016.NameBytesPair\"^\n\014MultiReq" + + "uest\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022\034" + + "\n\006action\030\002 \003(\0132\014.MultiAction\022\016\n\006atomic\030\003" + + " \001(\010\".\n\rMultiResponse\022\035\n\006result\030\001 \003(\0132\r." + + "ActionResult2\221\003\n\rClientService\022 \n\003get\022\013." + + "GetRequest\032\014.GetResponse\022)\n\006mutate\022\016.Mut" + + "ateRequest\032\017.MutateResponse\022#\n\004scan\022\014.Sc" + + "anRequest\032\r.ScanResponse\022,\n\007lockRow\022\017.Lo", + "ckRowRequest\032\020.LockRowResponse\0222\n\tunlock" + + "Row\022\021.UnlockRowRequest\032\022.UnlockRowRespon" + + "se\022>\n\rbulkLoadHFile\022\025.BulkLoadHFileReque" + + "st\032\026.BulkLoadHFileResponse\022D\n\017execCoproc" + + "essor\022\027.ExecCoprocessorRequest\032\030.ExecCop" + + "rocessorResponse\022&\n\005multi\022\r.MultiRequest" + + "\032\016.MultiResponseBB\n*org.apache.hadoop.hb" + + "ase.protobuf.generatedB\014ClientProtosH\001\210\001" + + "\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -21490,7 +21756,7 @@ public final class ClientProtos { internal_static_Get_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_Get_descriptor, - new java.lang.String[] { "Row", "Column", "Attribute", "LockId", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", }, + new java.lang.String[] { "Row", "Column", "Attribute", "LockId", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "StoreLimit", "StoreOffset", }, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Get.class, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Get.Builder.class); internal_static_Result_descriptor = @@ -21570,7 +21836,7 @@ public final class ClientProtos { internal_static_Scan_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_Scan_descriptor, - new java.lang.String[] { "Column", "Attribute", "StartRow", "StopRow", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "BatchSize", "MaxResultSize", }, + new java.lang.String[] { "Column", "Attribute", "StartRow", "StopRow", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "BatchSize", "MaxResultSize", "StoreLimit", "StoreOffset", }, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.class, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.Builder.class); internal_static_ScanRequest_descriptor = diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index c43fbc9..aae3a54 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -51,6 +51,9 @@ class StoreScanner extends NonLazyKeyValueScanner private KeyValueHeap heap; private boolean cacheBlocks; + private int countPerRow = 0; + private int storeLimit = -1; + private int storeOffset = 0; private String metricNamePrefix; // Used to indicate that the scanner has closed (see HBASE-1107) @@ -133,6 +136,12 @@ class StoreScanner extends NonLazyKeyValueScanner } } + // set storeLimit + this.storeLimit = scan.getMaxResultsPerColumnFamily(); + + // set rowOffset + this.storeOffset = scan.getRowOffsetPerColumnFamily(); + // Combine all seeked scanners with a heap heap = new KeyValueHeap(scanners, store.comparator); @@ -342,6 +351,7 @@ class StoreScanner extends NonLazyKeyValueScanner // only call setRow if the row changes; avoids confusing the query matcher // if scanning intra-row if ((matcher.row == null) || !peeked.matchingRow(matcher.row)) { + this.countPerRow = 0; matcher.setRow(peeked.getRow()); } @@ -371,11 +381,27 @@ class StoreScanner extends NonLazyKeyValueScanner if (f != null) { kv = f.transform(kv); } - results.add(kv); - if (metric != null) { - RegionMetricsStorage.incrNumericMetric(this.metricNamePrefix + metric, + this.countPerRow++; + if (storeLimit > -1 && + this.countPerRow > (storeLimit + storeOffset)) { + // do what SEEK_NEXT_ROW does. + if (!matcher.moreRowsMayExistAfter(kv)) { + outResult.addAll(results); + return false; + } + reseek(matcher.getKeyForNextRow(kv)); + break LOOP; + } + + // add to results only if we have skipped #storeOffset kvs + // also update metric accordingly + if (this.countPerRow > storeOffset) { + if (metric != null) { + RegionMetricsStorage.incrNumericMetric(this.metricNamePrefix + metric, kv.getLength()); + } + results.add(kv); } if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) { @@ -533,6 +559,7 @@ class StoreScanner extends NonLazyKeyValueScanner kv = lastTopKey; } if ((matcher.row == null) || !kv.matchingRow(matcher.row)) { + this.countPerRow = 0; matcher.reset(); matcher.setRow(kv.getRow()); } diff --git hbase-server/src/main/protobuf/Client.proto hbase-server/src/main/protobuf/Client.proto index ad2cb93..4131836 100644 --- hbase-server/src/main/protobuf/Client.proto +++ hbase-server/src/main/protobuf/Client.proto @@ -46,6 +46,8 @@ message Get { optional TimeRange timeRange = 6; optional uint32 maxVersions = 7 [default = 1]; optional bool cacheBlocks = 8 [default = true]; + optional uint32 storeLimit = 9; + optional uint32 storeOffset = 10; } /** @@ -195,6 +197,8 @@ message Scan { optional bool cacheBlocks = 8 [default = true]; optional uint32 batchSize = 9; optional uint64 maxResultSize = 10; + optional uint32 storeLimit = 11; + optional uint32 storeOffset = 12; } /** diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/HTestConst.java hbase-server/src/test/java/org/apache/hadoop/hbase/HTestConst.java index 62d0079..4bcca41 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/HTestConst.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/HTestConst.java @@ -23,7 +23,10 @@ import java.util.Collections; import org.apache.hadoop.hbase.util.Bytes; -/** Similar to {@link HConstants} but for tests. */ +/** + * Similar to {@link HConstants} but for tests. Also provides some simple + * static utility functions to generate test data. + */ public class HTestConst { private HTestConst() { @@ -39,4 +42,26 @@ public class HTestConst { Collections.unmodifiableSet(new HashSet( Arrays.asList(new String[] { DEFAULT_CF_STR }))); + public static final String DEFAULT_ROW_STR = "MyTestRow"; + public static final byte[] DEFAULT_ROW_BYTES = Bytes.toBytes(DEFAULT_ROW_STR); + + public static final String DEFAULT_QUALIFIER_STR = "MyColumnQualifier"; + public static final byte[] DEFAULT_QUALIFIER_BYTES = Bytes.toBytes(DEFAULT_QUALIFIER_STR); + + public static String DEFAULT_VALUE_STR = "MyTestValue"; + public static byte[] DEFAULT_VALUE_BYTES = Bytes.toBytes(DEFAULT_VALUE_STR); + + /** + * Generate the given number of unique byte sequences by appending numeric + * suffixes (ASCII representations of decimal numbers). + */ + public static byte[][] makeNAscii(byte[] base, int n) { + byte [][] ret = new byte[n][]; + for (int i = 0; i < n; i++) { + byte[] tail = Bytes.toBytes(Integer.toString(i)); + ret[i] = Bytes.add(base, tail); + } + return ret; + } + } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIntraRowPagination.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIntraRowPagination.java new file mode 100644 index 0000000..632b483 --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIntraRowPagination.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.HTestConst; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Test scan/get offset and limit settings within one row through HRegion API. + */ +@Category(SmallTests.class) +public class TestIntraRowPagination { + + private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + /** + * Test from client side for scan with maxResultPerCF set + * + * @throws Exception + */ + @Test + public void testScanLimitAndOffset() throws Exception { + //byte [] TABLE = HTestConst.DEFAULT_TABLE_BYTES; + byte [][] ROWS = HTestConst.makeNAscii(HTestConst.DEFAULT_ROW_BYTES, 2); + byte [][] FAMILIES = HTestConst.makeNAscii(HTestConst.DEFAULT_CF_BYTES, 3); + byte [][] QUALIFIERS = HTestConst.makeNAscii(HTestConst.DEFAULT_QUALIFIER_BYTES, 10); + + HTableDescriptor htd = new HTableDescriptor(HTestConst.DEFAULT_TABLE_BYTES); + HRegionInfo info = new HRegionInfo(HTestConst.DEFAULT_TABLE_BYTES, null, null, false); + for (byte[] family : FAMILIES) { + HColumnDescriptor hcd = new HColumnDescriptor(family); + htd.addFamily(hcd); + } + HRegion region = + HRegion.createHRegion(info, TEST_UTIL.getDataTestDir(), TEST_UTIL.getConfiguration(), htd); + try { + Put put; + Scan scan; + Result result; + boolean toLog = true; + + List kvListExp = new ArrayList(); + + int storeOffset = 1; + int storeLimit = 3; + for (int r = 0; r < ROWS.length; r++) { + put = new Put(ROWS[r]); + for (int c = 0; c < FAMILIES.length; c++) { + for (int q = 0; q < QUALIFIERS.length; q++) { + KeyValue kv = new KeyValue(ROWS[r], FAMILIES[c], QUALIFIERS[q], 1, + HTestConst.DEFAULT_VALUE_BYTES); + put.add(kv); + if (storeOffset <= q && q < storeOffset + storeLimit) { + kvListExp.add(kv); + } + } + } + region.put(put); + } + + scan = new Scan(); + scan.setRowOffsetPerColumnFamily(storeOffset); + scan.setMaxResultsPerColumnFamily(storeLimit); + RegionScanner scanner = region.getScanner(scan); + List kvListScan = new ArrayList(); + List results = new ArrayList(); + while (scanner.next(results) || !results.isEmpty()) { + kvListScan.addAll(results); + results.clear(); + } + result = new Result(kvListScan); + TestScannersFromClientSide.verifyResult(result, kvListExp, toLog, + "Testing scan with storeOffset and storeLimit"); + } finally { + region.close(); + } + } + + @org.junit.Rule + public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = + new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); +} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java new file mode 100644 index 0000000..247fd48 --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java @@ -0,0 +1,457 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HTestConst; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.filter.ColumnPrefixFilter; +import org.apache.hadoop.hbase.filter.ColumnRangeFilter; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * A client-side test, mostly testing scanners with various parameters. + */ +@Category(MediumTests.class) +public class TestScannersFromClientSide { + private static final Log LOG = LogFactory.getLog(TestScannersFromClientSide.class); + + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static byte [] ROW = Bytes.toBytes("testRow"); + private static byte [] FAMILY = Bytes.toBytes("testFamily"); + private static byte [] QUALIFIER = Bytes.toBytes("testQualifier"); + private static byte [] VALUE = Bytes.toBytes("testValue"); + + /** + * @throws java.lang.Exception + */ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniCluster(3); + } + + /** + * @throws java.lang.Exception + */ + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * @throws java.lang.Exception + */ + @Before + public void setUp() throws Exception { + // Nothing to do. + } + + /** + * @throws java.lang.Exception + */ + @After + public void tearDown() throws Exception { + // Nothing to do. + } + + /** + * Test from client side for batch of scan + * + * @throws Exception + */ + @Test + public void testScanBatch() throws Exception { + byte [] TABLE = Bytes.toBytes("testScanBatch"); + byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 8); + + HTable ht = TEST_UTIL.createTable(TABLE, FAMILY); + + Put put; + Scan scan; + Delete delete; + Result result; + ResultScanner scanner; + boolean toLog = true; + List kvListExp; + + // table: row, family, c0:0, c1:1, ... , c7:7 + put = new Put(ROW); + for (int i=0; i < QUALIFIERS.length; i++) { + KeyValue kv = new KeyValue(ROW, FAMILY, QUALIFIERS[i], i, VALUE); + put.add(kv); + } + ht.put(put); + + // table: row, family, c0:0, c1:1, ..., c6:2, c6:6 , c7:7 + put = new Put(ROW); + KeyValue kv = new KeyValue(ROW, FAMILY, QUALIFIERS[6], 2, VALUE); + put.add(kv); + ht.put(put); + + // delete upto ts: 3 + delete = new Delete(ROW); + delete.deleteFamily(FAMILY, 3); + ht.delete(delete); + + // without batch + scan = new Scan(ROW); + scan.setMaxVersions(); + scanner = ht.getScanner(scan); + + // c4:4, c5:5, c6:6, c7:7 + kvListExp = new ArrayList(); + kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[4], 4, VALUE)); + kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[5], 5, VALUE)); + kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[6], 6, VALUE)); + kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[7], 7, VALUE)); + result = scanner.next(); + verifyResult(result, kvListExp, toLog, "Testing first batch of scan"); + + // with batch + scan = new Scan(ROW); + scan.setMaxVersions(); + scan.setBatch(2); + scanner = ht.getScanner(scan); + + // First batch: c4:4, c5:5 + kvListExp = new ArrayList(); + kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[4], 4, VALUE)); + kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[5], 5, VALUE)); + result = scanner.next(); + verifyResult(result, kvListExp, toLog, "Testing first batch of scan"); + + // Second batch: c6:6, c7:7 + kvListExp = new ArrayList(); + kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[6], 6, VALUE)); + kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[7], 7, VALUE)); + result = scanner.next(); + verifyResult(result, kvListExp, toLog, "Testing second batch of scan"); + + } + + /** + * Test from client side for get with maxResultPerCF set + * + * @throws Exception + */ + @Test + public void testGetMaxResults() throws Exception { + byte [] TABLE = Bytes.toBytes("testGetMaxResults"); + byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3); + byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20); + + HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES); + + Get get; + Put put; + Result result; + boolean toLog = true; + List kvListExp; + + kvListExp = new ArrayList(); + // Insert one CF for row[0] + put = new Put(ROW); + for (int i=0; i < 10; i++) { + KeyValue kv = new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE); + put.add(kv); + kvListExp.add(kv); + } + ht.put(put); + + get = new Get(ROW); + result = ht.get(get); + verifyResult(result, kvListExp, toLog, "Testing without setting maxResults"); + + get = new Get(ROW); + get.setMaxResultsPerColumnFamily(2); + result = ht.get(get); + kvListExp = new ArrayList(); + kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[0], 1, VALUE)); + kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[1], 1, VALUE)); + verifyResult(result, kvListExp, toLog, "Testing basic setMaxResults"); + + // Filters: ColumnRangeFilter + get = new Get(ROW); + get.setMaxResultsPerColumnFamily(5); + get.setFilter(new ColumnRangeFilter(QUALIFIERS[2], true, QUALIFIERS[5], + true)); + result = ht.get(get); + kvListExp = new ArrayList(); + kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[2], 1, VALUE)); + kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[3], 1, VALUE)); + kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[4], 1, VALUE)); + kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[5], 1, VALUE)); + verifyResult(result, kvListExp, toLog, "Testing single CF with CRF"); + + // Insert two more CF for row[0] + // 20 columns for CF2, 10 columns for CF1 + put = new Put(ROW); + for (int i=0; i < QUALIFIERS.length; i++) { + KeyValue kv = new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE); + put.add(kv); + } + ht.put(put); + + put = new Put(ROW); + for (int i=0; i < 10; i++) { + KeyValue kv = new KeyValue(ROW, FAMILIES[1], QUALIFIERS[i], 1, VALUE); + put.add(kv); + } + ht.put(put); + + get = new Get(ROW); + get.setMaxResultsPerColumnFamily(12); + get.addFamily(FAMILIES[1]); + get.addFamily(FAMILIES[2]); + result = ht.get(get); + kvListExp = new ArrayList(); + //Exp: CF1:q0, ..., q9, CF2: q0, q1, q10, q11, ..., q19 + for (int i=0; i < 10; i++) { + kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[i], 1, VALUE)); + } + for (int i=0; i < 2; i++) { + kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE)); + } + for (int i=10; i < 20; i++) { + kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE)); + } + verifyResult(result, kvListExp, toLog, "Testing multiple CFs"); + + // Filters: ColumnRangeFilter and ColumnPrefixFilter + get = new Get(ROW); + get.setMaxResultsPerColumnFamily(3); + get.setFilter(new ColumnRangeFilter(QUALIFIERS[2], true, null, true)); + result = ht.get(get); + kvListExp = new ArrayList(); + for (int i=2; i < 5; i++) { + kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE)); + } + for (int i=2; i < 5; i++) { + kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[i], 1, VALUE)); + } + for (int i=2; i < 5; i++) { + kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE)); + } + verifyResult(result, kvListExp, toLog, "Testing multiple CFs + CRF"); + + get = new Get(ROW); + get.setMaxResultsPerColumnFamily(7); + get.setFilter(new ColumnPrefixFilter(QUALIFIERS[1])); + result = ht.get(get); + kvListExp = new ArrayList(); + kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[1], 1, VALUE)); + kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[1], 1, VALUE)); + kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[1], 1, VALUE)); + for (int i=10; i < 16; i++) { + kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE)); + } + verifyResult(result, kvListExp, toLog, "Testing multiple CFs + PFF"); + + } + + /** + * Test from client side for scan with maxResultPerCF set + * + * @throws Exception + */ + @Test + public void testScanMaxResults() throws Exception { + byte [] TABLE = Bytes.toBytes("testScanLimit"); + byte [][] ROWS = HTestConst.makeNAscii(ROW, 2); + byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3); + byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 10); + + HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES); + + Put put; + Scan scan; + Result result; + boolean toLog = true; + List kvListExp, kvListScan; + + kvListExp = new ArrayList(); + + for (int r=0; r < ROWS.length; r++) { + put = new Put(ROWS[r]); + for (int c=0; c < FAMILIES.length; c++) { + for (int q=0; q < QUALIFIERS.length; q++) { + KeyValue kv = new KeyValue(ROWS[r], FAMILIES[c], QUALIFIERS[q], 1, VALUE); + put.add(kv); + if (q < 4) { + kvListExp.add(kv); + } + } + } + ht.put(put); + } + + scan = new Scan(); + scan.setMaxResultsPerColumnFamily(4); + ResultScanner scanner = ht.getScanner(scan); + kvListScan = new ArrayList(); + while ((result = scanner.next()) != null) { + for (KeyValue kv : result.list()) { + kvListScan.add(kv); + } + } + result = new Result(kvListScan); + verifyResult(result, kvListExp, toLog, "Testing scan with maxResults"); + + } + + /** + * Test from client side for get with rowOffset + * + * @throws Exception + */ + @Test + public void testGetRowOffset() throws Exception { + byte [] TABLE = Bytes.toBytes("testGetRowOffset"); + byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3); + byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20); + + HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES); + + Get get; + Put put; + Result result; + boolean toLog = true; + List kvListExp; + + // Insert one CF for row + kvListExp = new ArrayList(); + put = new Put(ROW); + for (int i=0; i < 10; i++) { + KeyValue kv = new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE); + put.add(kv); + // skipping first two kvs + if (i < 2) continue; + kvListExp.add(kv); + } + ht.put(put); + + //setting offset to 2 + get = new Get(ROW); + get.setRowOffsetPerColumnFamily(2); + result = ht.get(get); + verifyResult(result, kvListExp, toLog, "Testing basic setRowOffset"); + + //setting offset to 20 + get = new Get(ROW); + get.setRowOffsetPerColumnFamily(20); + result = ht.get(get); + kvListExp = new ArrayList(); + verifyResult(result, kvListExp, toLog, "Testing offset > #kvs"); + + //offset + maxResultPerCF + get = new Get(ROW); + get.setRowOffsetPerColumnFamily(4); + get.setMaxResultsPerColumnFamily(5); + result = ht.get(get); + kvListExp = new ArrayList(); + for (int i=4; i < 9; i++) { + kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE)); + } + verifyResult(result, kvListExp, toLog, + "Testing offset + setMaxResultsPerCF"); + + // Filters: ColumnRangeFilter + get = new Get(ROW); + get.setRowOffsetPerColumnFamily(1); + get.setFilter(new ColumnRangeFilter(QUALIFIERS[2], true, QUALIFIERS[5], + true)); + result = ht.get(get); + kvListExp = new ArrayList(); + kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[3], 1, VALUE)); + kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[4], 1, VALUE)); + kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[5], 1, VALUE)); + verifyResult(result, kvListExp, toLog, "Testing offset with CRF"); + + // Insert into two more CFs for row + // 10 columns for CF2, 10 columns for CF1 + for(int j=2; j > 0; j--) { + put = new Put(ROW); + for (int i=0; i < 10; i++) { + KeyValue kv = new KeyValue(ROW, FAMILIES[j], QUALIFIERS[i], 1, VALUE); + put.add(kv); + } + ht.put(put); + } + + get = new Get(ROW); + get.setRowOffsetPerColumnFamily(4); + get.setMaxResultsPerColumnFamily(2); + get.addFamily(FAMILIES[1]); + get.addFamily(FAMILIES[2]); + result = ht.get(get); + kvListExp = new ArrayList(); + //Exp: CF1:q4, q5, CF2: q4, q5 + kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[4], 1, VALUE)); + kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[5], 1, VALUE)); + kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[4], 1, VALUE)); + kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[5], 1, VALUE)); + verifyResult(result, kvListExp, toLog, + "Testing offset + multiple CFs + maxResults"); + + } + + static void verifyResult(Result result, List expKvList, boolean toLog, + String msg) { + + LOG.info(msg); + LOG.info("Expected count: " + expKvList.size()); + LOG.info("Actual count: " + result.size()); + if (expKvList.size() == 0) + return; + + int i = 0; + for (KeyValue kv : result.raw()) { + if (i >= expKvList.size()) { + break; // we will check the size later + } + + KeyValue kvExp = expKvList.get(i++); + if (toLog) { + LOG.info("get kv is: " + kv.toString()); + LOG.info("exp kv is: " + kvExp.toString()); + } + assertTrue("Not equal", kvExp.equals(kv)); + } + + assertEquals(expKvList.size(), result.size()); + } + + @org.junit.Rule + public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = + new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); + +} -- 1.7.4.4