diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 1814514..129dbdb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -508,13 +508,13 @@ public class ClientScanner extends AbstractClientScanner { public boolean renewLease() { if (callable != null) { // do not return any rows, do not advance the scanner - callable.setCaching(0); + callable.setRenew(true); try { this.caller.callWithoutRetries(callable, this.scannerTimeout); } catch (Exception e) { return false; } finally { - callable.setCaching(this.caching); + callable.setRenew(false); } return true; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java index e6c2c8e..81a8394 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java @@ -208,7 +208,7 @@ public class ClientSmallScanner extends ClientScanner { throw new InterruptedIOException(); } ScanRequest request = RequestConverter.buildScanRequest(getLocation() - .getRegionInfo().getRegionName(), getScan(), getCaching(), true); + .getRegionInfo().getRegionName(), getScan(), getCaching(), true, renew); ScanResponse response = null; try { controller.setPriority(getTableName()); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index f8da289..8d1c20d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -65,6 +65,7 @@ public class ScannerCallable extends RegionServerCallable { private long scannerId = -1L; protected boolean instantiated = false; protected boolean closed = false; + protected boolean renew = false; private Scan scan; private int caching = 1; protected ScanMetrics scanMetrics; @@ -166,7 +167,8 @@ public class ScannerCallable extends RegionServerCallable { ScanRequest request = null; try { incRPCcallsMetrics(); - request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq); + request = + RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq, renew); ScanResponse response = null; try { controller.setPriority(getTableName()); @@ -338,6 +340,10 @@ public class ScannerCallable extends RegionServerCallable { this.closed = true; } + public void setRenew(boolean val) { + this.renew = val; + } + /** * @return the HRegionInfo for the current region */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java index f244a50..92b7290 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java @@ -490,6 +490,31 @@ public final class RequestConverter { } /** + * Create a protocol buffer ScanRequest for a client Scan + * + * @param regionName + * @param scan + * @param numberOfRows + * @param closeScanner + * @param renew + * @return a scan request + * @throws IOException + */ + public static ScanRequest buildScanRequest(final byte[] regionName, + final Scan scan, final int numberOfRows, + final boolean closeScanner, final boolean renew) throws IOException { + ScanRequest.Builder builder = ScanRequest.newBuilder(); + RegionSpecifier region = buildRegionSpecifier( + RegionSpecifierType.REGION_NAME, regionName); + builder.setNumberOfRows(numberOfRows); + builder.setCloseScanner(closeScanner); + builder.setRegion(region); + builder.setScan(ProtobufUtil.toScan(scan)); + builder.setRenew(renew); + return builder.build(); + } + + /** * Create a protocol buffer ScanRequest for a scanner id * * @param scannerId @@ -516,12 +541,13 @@ public final class RequestConverter { * @return a scan request */ public static ScanRequest buildScanRequest(final long scannerId, final int numberOfRows, - final boolean closeScanner, final long nextCallSeq) { + final boolean closeScanner, final long nextCallSeq, final boolean renew) { ScanRequest.Builder builder = ScanRequest.newBuilder(); builder.setNumberOfRows(numberOfRows); builder.setCloseScanner(closeScanner); builder.setScannerId(scannerId); builder.setNextCallSeq(nextCallSeq); + builder.setRenew(renew); return builder.build(); } diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java index 14ddb58..30de3c6 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java @@ -15870,6 +15870,16 @@ public final class ClientProtos { * optional uint64 next_call_seq = 6; */ long getNextCallSeq(); + + // optional bool renew = 7 [default = false]; + /** + * optional bool renew = 7 [default = false]; + */ + boolean hasRenew(); + /** + * optional bool renew = 7 [default = false]; + */ + boolean getRenew(); } /** * Protobuf type {@code ScanRequest} @@ -15981,6 +15991,11 @@ public final class ClientProtos { nextCallSeq_ = input.readUInt64(); break; } + case 56: { + bitField0_ |= 0x00000040; + renew_ = input.readBool(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -16129,6 +16144,22 @@ public final class ClientProtos { return nextCallSeq_; } + // optional bool renew = 7 [default = false]; + public static final int RENEW_FIELD_NUMBER = 7; + private boolean renew_; + /** + * optional bool renew = 7 [default = false]; + */ + public boolean hasRenew() { + return ((bitField0_ & 0x00000040) == 0x00000040); + } + /** + * optional bool renew = 7 [default = false]; + */ + public boolean getRenew() { + return renew_; + } + private void initFields() { region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance(); scan_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.getDefaultInstance(); @@ -16136,6 +16167,7 @@ public final class ClientProtos { numberOfRows_ = 0; closeScanner_ = false; nextCallSeq_ = 0L; + renew_ = false; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -16179,6 +16211,9 @@ public final class ClientProtos { if (((bitField0_ & 0x00000020) == 0x00000020)) { output.writeUInt64(6, nextCallSeq_); } + if (((bitField0_ & 0x00000040) == 0x00000040)) { + output.writeBool(7, renew_); + } getUnknownFields().writeTo(output); } @@ -16212,6 +16247,10 @@ public final class ClientProtos { size += com.google.protobuf.CodedOutputStream .computeUInt64Size(6, nextCallSeq_); } + if (((bitField0_ & 0x00000040) == 0x00000040)) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(7, renew_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -16265,6 +16304,11 @@ public final class ClientProtos { result = result && (getNextCallSeq() == other.getNextCallSeq()); } + result = result && (hasRenew() == other.hasRenew()); + if (hasRenew()) { + result = result && (getRenew() + == other.getRenew()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -16302,6 +16346,10 @@ public final class ClientProtos { hash = (37 * hash) + NEXT_CALL_SEQ_FIELD_NUMBER; hash = (53 * hash) + hashLong(getNextCallSeq()); } + if (hasRenew()) { + hash = (37 * hash) + RENEW_FIELD_NUMBER; + hash = (53 * hash) + hashBoolean(getRenew()); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -16446,6 +16494,8 @@ public final class ClientProtos { bitField0_ = (bitField0_ & ~0x00000010); nextCallSeq_ = 0L; bitField0_ = (bitField0_ & ~0x00000020); + renew_ = false; + bitField0_ = (bitField0_ & ~0x00000040); return this; } @@ -16506,6 +16556,10 @@ public final class ClientProtos { to_bitField0_ |= 0x00000020; } result.nextCallSeq_ = nextCallSeq_; + if (((from_bitField0_ & 0x00000040) == 0x00000040)) { + to_bitField0_ |= 0x00000040; + } + result.renew_ = renew_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -16540,6 +16594,9 @@ public final class ClientProtos { if (other.hasNextCallSeq()) { setNextCallSeq(other.getNextCallSeq()); } + if (other.hasRenew()) { + setRenew(other.getRenew()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -16945,6 +17002,39 @@ public final class ClientProtos { return this; } + // optional bool renew = 7 [default = false]; + private boolean renew_ ; + /** + * optional bool renew = 7 [default = false]; + */ + public boolean hasRenew() { + return ((bitField0_ & 0x00000040) == 0x00000040); + } + /** + * optional bool renew = 7 [default = false]; + */ + public boolean getRenew() { + return renew_; + } + /** + * optional bool renew = 7 [default = false]; + */ + public Builder setRenew(boolean value) { + bitField0_ |= 0x00000040; + renew_ = value; + onChanged(); + return this; + } + /** + * optional bool renew = 7 [default = false]; + */ + public Builder clearRenew() { + bitField0_ = (bitField0_ & ~0x00000040); + renew_ = false; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:ScanRequest) } @@ -31785,61 +31875,61 @@ public final class ClientProtos { "ze\030\n \001(\004\022\023\n\013store_limit\030\013 \001(\r\022\024\n\014store_o" + "ffset\030\014 \001(\r\022&\n\036load_column_families_on_d" + "emand\030\r \001(\010\022\r\n\005small\030\016 \001(\010\022\027\n\010reversed\030\017" + - " \001(\010:\005false\022\017\n\007caching\030\021 \001(\r\"\236\001\n\013ScanReq" + + " \001(\010:\005false\022\017\n\007caching\030\021 \001(\r\"\264\001\n\013ScanReq" + "uest\022 \n\006region\030\001 \001(\0132\020.RegionSpecifier\022\023" + "\n\004scan\030\002 \001(\0132\005.Scan\022\022\n\nscanner_id\030\003 \001(\004\022" + "\026\n\016number_of_rows\030\004 \001(\r\022\025\n\rclose_scanner" + - "\030\005 \001(\010\022\025\n\rnext_call_seq\030\006 \001(\004\"\231\001\n\014ScanRe", - "sponse\022\030\n\020cells_per_result\030\001 \003(\r\022\022\n\nscan" + - "ner_id\030\002 \001(\004\022\024\n\014more_results\030\003 \001(\010\022\013\n\003tt" + - "l\030\004 \001(\r\022\030\n\007results\030\005 \003(\0132\007.Result\022\036\n\026mor" + - "e_results_in_region\030\010 \001(\010\"\263\001\n\024BulkLoadHF" + - "ileRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpeci" + - "fier\0225\n\013family_path\030\002 \003(\0132 .BulkLoadHFil" + - "eRequest.FamilyPath\022\026\n\016assign_seq_num\030\003 " + - "\001(\010\032*\n\nFamilyPath\022\016\n\006family\030\001 \002(\014\022\014\n\004pat" + - "h\030\002 \002(\t\"\'\n\025BulkLoadHFileResponse\022\016\n\006load" + - "ed\030\001 \002(\010\"a\n\026CoprocessorServiceCall\022\013\n\003ro", - "w\030\001 \002(\014\022\024\n\014service_name\030\002 \002(\t\022\023\n\013method_" + - "name\030\003 \002(\t\022\017\n\007request\030\004 \002(\014\"9\n\030Coprocess" + - "orServiceResult\022\035\n\005value\030\001 \001(\0132\016.NameByt" + - "esPair\"d\n\031CoprocessorServiceRequest\022 \n\006r" + - "egion\030\001 \002(\0132\020.RegionSpecifier\022%\n\004call\030\002 " + - "\002(\0132\027.CoprocessorServiceCall\"]\n\032Coproces" + - "sorServiceResponse\022 \n\006region\030\001 \002(\0132\020.Reg" + - "ionSpecifier\022\035\n\005value\030\002 \002(\0132\016.NameBytesP" + - "air\"{\n\006Action\022\r\n\005index\030\001 \001(\r\022 \n\010mutation" + - "\030\002 \001(\0132\016.MutationProto\022\021\n\003get\030\003 \001(\0132\004.Ge", - "t\022-\n\014service_call\030\004 \001(\0132\027.CoprocessorSer" + - "viceCall\"Y\n\014RegionAction\022 \n\006region\030\001 \002(\013" + - "2\020.RegionSpecifier\022\016\n\006atomic\030\002 \001(\010\022\027\n\006ac" + - "tion\030\003 \003(\0132\007.Action\"c\n\017RegionLoadStats\022\027" + - "\n\014memstoreLoad\030\001 \001(\005:\0010\022\030\n\rheapOccupancy" + - "\030\002 \001(\005:\0010\022\035\n\022compactionPressure\030\003 \001(\005:\0010" + - "\"\266\001\n\021ResultOrException\022\r\n\005index\030\001 \001(\r\022\027\n" + - "\006result\030\002 \001(\0132\007.Result\022!\n\texception\030\003 \001(" + - "\0132\016.NameBytesPair\0221\n\016service_result\030\004 \001(" + - "\0132\031.CoprocessorServiceResult\022#\n\tloadStat", - "s\030\005 \001(\0132\020.RegionLoadStats\"f\n\022RegionActio" + - "nResult\022-\n\021resultOrException\030\001 \003(\0132\022.Res" + - "ultOrException\022!\n\texception\030\002 \001(\0132\016.Name" + - "BytesPair\"f\n\014MultiRequest\022#\n\014regionActio" + - "n\030\001 \003(\0132\r.RegionAction\022\022\n\nnonceGroup\030\002 \001" + - "(\004\022\035\n\tcondition\030\003 \001(\0132\n.Condition\"S\n\rMul" + - "tiResponse\022/\n\022regionActionResult\030\001 \003(\0132\023" + - ".RegionActionResult\022\021\n\tprocessed\030\002 \001(\0102\205" + - "\003\n\rClientService\022 \n\003Get\022\013.GetRequest\032\014.G" + - "etResponse\022)\n\006Mutate\022\016.MutateRequest\032\017.M", - "utateResponse\022#\n\004Scan\022\014.ScanRequest\032\r.Sc" + - "anResponse\022>\n\rBulkLoadHFile\022\025.BulkLoadHF" + - "ileRequest\032\026.BulkLoadHFileResponse\022F\n\013Ex" + - "ecService\022\032.CoprocessorServiceRequest\032\033." + - "CoprocessorServiceResponse\022R\n\027ExecRegion" + - "ServerService\022\032.CoprocessorServiceReques" + - "t\032\033.CoprocessorServiceResponse\022&\n\005Multi\022" + - "\r.MultiRequest\032\016.MultiResponseBB\n*org.ap" + - "ache.hadoop.hbase.protobuf.generatedB\014Cl" + - "ientProtosH\001\210\001\001\240\001\001" + "\030\005 \001(\010\022\025\n\rnext_call_seq\030\006 \001(\004\022\024\n\005renew\030\007", + " \001(\010:\005false\"\231\001\n\014ScanResponse\022\030\n\020cells_pe" + + "r_result\030\001 \003(\r\022\022\n\nscanner_id\030\002 \001(\004\022\024\n\014mo" + + "re_results\030\003 \001(\010\022\013\n\003ttl\030\004 \001(\r\022\030\n\007results" + + "\030\005 \003(\0132\007.Result\022\036\n\026more_results_in_regio" + + "n\030\010 \001(\010\"\263\001\n\024BulkLoadHFileRequest\022 \n\006regi" + + "on\030\001 \002(\0132\020.RegionSpecifier\0225\n\013family_pat" + + "h\030\002 \003(\0132 .BulkLoadHFileRequest.FamilyPat" + + "h\022\026\n\016assign_seq_num\030\003 \001(\010\032*\n\nFamilyPath\022" + + "\016\n\006family\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoa" + + "dHFileResponse\022\016\n\006loaded\030\001 \002(\010\"a\n\026Coproc", + "essorServiceCall\022\013\n\003row\030\001 \002(\014\022\024\n\014service" + + "_name\030\002 \002(\t\022\023\n\013method_name\030\003 \002(\t\022\017\n\007requ" + + "est\030\004 \002(\014\"9\n\030CoprocessorServiceResult\022\035\n" + + "\005value\030\001 \001(\0132\016.NameBytesPair\"d\n\031Coproces" + + "sorServiceRequest\022 \n\006region\030\001 \002(\0132\020.Regi" + + "onSpecifier\022%\n\004call\030\002 \002(\0132\027.CoprocessorS" + + "erviceCall\"]\n\032CoprocessorServiceResponse" + + "\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022\035\n\005va" + + "lue\030\002 \002(\0132\016.NameBytesPair\"{\n\006Action\022\r\n\005i" + + "ndex\030\001 \001(\r\022 \n\010mutation\030\002 \001(\0132\016.MutationP", + "roto\022\021\n\003get\030\003 \001(\0132\004.Get\022-\n\014service_call\030" + + "\004 \001(\0132\027.CoprocessorServiceCall\"Y\n\014Region" + + "Action\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier" + + "\022\016\n\006atomic\030\002 \001(\010\022\027\n\006action\030\003 \003(\0132\007.Actio" + + "n\"c\n\017RegionLoadStats\022\027\n\014memstoreLoad\030\001 \001" + + "(\005:\0010\022\030\n\rheapOccupancy\030\002 \001(\005:\0010\022\035\n\022compa" + + "ctionPressure\030\003 \001(\005:\0010\"\266\001\n\021ResultOrExcep" + + "tion\022\r\n\005index\030\001 \001(\r\022\027\n\006result\030\002 \001(\0132\007.Re" + + "sult\022!\n\texception\030\003 \001(\0132\016.NameBytesPair\022" + + "1\n\016service_result\030\004 \001(\0132\031.CoprocessorSer", + "viceResult\022#\n\tloadStats\030\005 \001(\0132\020.RegionLo" + + "adStats\"f\n\022RegionActionResult\022-\n\021resultO" + + "rException\030\001 \003(\0132\022.ResultOrException\022!\n\t" + + "exception\030\002 \001(\0132\016.NameBytesPair\"f\n\014Multi" + + "Request\022#\n\014regionAction\030\001 \003(\0132\r.RegionAc" + + "tion\022\022\n\nnonceGroup\030\002 \001(\004\022\035\n\tcondition\030\003 " + + "\001(\0132\n.Condition\"S\n\rMultiResponse\022/\n\022regi" + + "onActionResult\030\001 \003(\0132\023.RegionActionResul" + + "t\022\021\n\tprocessed\030\002 \001(\0102\205\003\n\rClientService\022 " + + "\n\003Get\022\013.GetRequest\032\014.GetResponse\022)\n\006Muta", + "te\022\016.MutateRequest\032\017.MutateResponse\022#\n\004S" + + "can\022\014.ScanRequest\032\r.ScanResponse\022>\n\rBulk" + + "LoadHFile\022\025.BulkLoadHFileRequest\032\026.BulkL" + + "oadHFileResponse\022F\n\013ExecService\022\032.Coproc" + + "essorServiceRequest\032\033.CoprocessorService" + + "Response\022R\n\027ExecRegionServerService\022\032.Co" + + "processorServiceRequest\032\033.CoprocessorSer" + + "viceResponse\022&\n\005Multi\022\r.MultiRequest\032\016.M" + + "ultiResponseBB\n*org.apache.hadoop.hbase." + + "protobuf.generatedB\014ClientProtosH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -31935,7 +32025,7 @@ public final class ClientProtos { internal_static_ScanRequest_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_ScanRequest_descriptor, - new java.lang.String[] { "Region", "Scan", "ScannerId", "NumberOfRows", "CloseScanner", "NextCallSeq", }); + new java.lang.String[] { "Region", "Scan", "ScannerId", "NumberOfRows", "CloseScanner", "NextCallSeq", "Renew", }); internal_static_ScanResponse_descriptor = getDescriptor().getMessageTypes().get(13); internal_static_ScanResponse_fieldAccessorTable = new diff --git a/hbase-protocol/src/main/protobuf/Client.proto b/hbase-protocol/src/main/protobuf/Client.proto index b93b0ac..a535f59 100644 --- a/hbase-protocol/src/main/protobuf/Client.proto +++ b/hbase-protocol/src/main/protobuf/Client.proto @@ -254,6 +254,7 @@ message ScanRequest { optional uint32 number_of_rows = 4; optional bool close_scanner = 5; optional uint64 next_call_seq = 6; + optional bool renew = 7 [default = false]; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 94e437f..26a7a4f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -3253,7 +3253,13 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa scannerName = String.valueOf(scannerId); ttl = this.scannerLeaseTimeoutPeriod; } - + if (request.hasRenew() && request.getRenew()) { + lease = leases.removeLease(scannerName); + if (lease != null && scanners.containsKey(scannerName)) { + leases.addLease(lease); + } + return builder.build(); + } if (rows > 0) { // if nextCallSeq does not match throw Exception straight away. This needs to be // performed even before checking of Lease. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java index b06ea04..ca3ae8d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java @@ -490,29 +490,4 @@ public class TestFromClientSide3 { table.close(); } } - - @Test - public void testLeaseRenewal() throws Exception { - HTable table = TEST_UTIL.createTable( - Bytes.toBytes("testLeaseRenewal"), FAMILY); - Put p = new Put(ROW_BYTES); - p.add(FAMILY, COL_QUAL, VAL_BYTES); - table.put(p); - p = new Put(ANOTHERROW); - p.add(FAMILY, COL_QUAL, VAL_BYTES); - table.put(p); - Scan s = new Scan(); - s.setCaching(1); - ResultScanner rs = table.getScanner(s); - // make sure that calling renewLease does not impact the scan results - assertTrue(((AbstractClientScanner)rs).renewLease()); - assertTrue(Arrays.equals(rs.next().getRow(), ANOTHERROW)); - assertTrue(((AbstractClientScanner)rs).renewLease()); - assertTrue(Arrays.equals(rs.next().getRow(), ROW_BYTES)); - assertTrue(((AbstractClientScanner)rs).renewLease()); - assertNull(rs.next()); - assertFalse(((AbstractClientScanner)rs).renewLease()); - rs.close(); - table.close(); - } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestLeaseRenewal.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestLeaseRenewal.java new file mode 100644 index 0000000..c4885c9 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestLeaseRenewal.java @@ -0,0 +1,122 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(LargeTests.class) +public class TestLeaseRenewal { + final Log LOG = LogFactory.getLog(getClass()); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static byte[] FAMILY = Bytes.toBytes("testFamily"); + private static final byte[] ANOTHERROW = Bytes.toBytes("anotherrow"); + private final static byte[] COL_QUAL = Bytes.toBytes("f1"); + private final static byte[] VAL_BYTES = Bytes.toBytes("v1"); + private final static byte[] ROW_BYTES = Bytes.toBytes("r1"); + private final static int leaseTimeout = HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD/4; + + /** + * @throws java.lang.Exception + */ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, leaseTimeout); + TEST_UTIL.startMiniCluster(); + } + + /** + * @throws java.lang.Exception + */ + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * @throws java.lang.Exception + */ + @Before + public void setUp() throws Exception { + // Nothing to do. + } + + /** + * @throws java.lang.Exception + */ + @After + public void tearDown() throws Exception { + for (HTableDescriptor htd : TEST_UTIL.getHBaseAdmin().listTables()) { + LOG.info("Tear down, remove table=" + htd.getTableName()); + TEST_UTIL.deleteTable(htd.getTableName()); + } + } + + @Test + public void testLeaseRenewal() throws Exception { + HTable table = TEST_UTIL.createTable( + Bytes.toBytes("testLeaseRenewal"), FAMILY); + Put p = new Put(ROW_BYTES); + p.add(FAMILY, COL_QUAL, VAL_BYTES); + table.put(p); + p = new Put(ANOTHERROW); + p.add(FAMILY, COL_QUAL, VAL_BYTES); + table.put(p); + Scan s = new Scan(); + s.setCaching(1); + ResultScanner rs = table.getScanner(s); + // make sure that calling renewLease does not impact the scan results + assertTrue(((AbstractClientScanner)rs).renewLease()); + assertTrue(Arrays.equals(rs.next().getRow(), ANOTHERROW)); + // renew the lease a few times, long enough to be sure + // the lease would have expired otherwise + Thread.sleep(leaseTimeout/2); + assertTrue(((AbstractClientScanner)rs).renewLease()); + Thread.sleep(leaseTimeout/2); + assertTrue(((AbstractClientScanner)rs).renewLease()); + Thread.sleep(leaseTimeout/2); + assertTrue(((AbstractClientScanner)rs).renewLease()); + // make sure we haven't advanced the scanner + assertTrue(Arrays.equals(rs.next().getRow(), ROW_BYTES)); + assertTrue(((AbstractClientScanner)rs).renewLease()); + // make sure scanner is exhausted now + assertNull(rs.next()); + // renewLease should return false now + assertFalse(((AbstractClientScanner)rs).renewLease()); + rs.close(); + table.close(); + } +}