Index: hbase-server/src/main/java/org/apache/hadoop/hbase/CallSequenceOutOfOrderException.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/CallSequenceOutOfOrderException.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/CallSequenceOutOfOrderException.java (revision 0) @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +/** + * Thrown by a region server while doing scan() calls. Both client and server maintain a + * callSequence and if they do not match, RS will throw this exception. + */ +public class CallSequenceOutOfOrderException extends DoNotRetryIOException { + + private static final long serialVersionUID = -1625098136365979523L; + + public CallSequenceOutOfOrderException() { + super(); + } + + public CallSequenceOutOfOrderException(String msg) { + super(msg); + } +} \ No newline at end of file Index: hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java (revision 1345869) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java (working copy) @@ -26,6 +26,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CallSequenceOutOfOrderException; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; @@ -296,8 +297,9 @@ } } else { Throwable cause = e.getCause(); - if (cause == null || (!(cause instanceof NotServingRegionException) - && !(cause instanceof RegionServerStoppedException))) { + if ((cause == null || (!(cause instanceof NotServingRegionException) + && !(cause instanceof RegionServerStoppedException))) + && !(e instanceof CallSequenceOutOfOrderException)) { throw e; } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java (revision 1345869) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java (working copy) @@ -67,6 +67,7 @@ // indicate if it is a remote server call private boolean isRegionServerRemote = true; + private long callSeq = 0; /** * @param connection which connection @@ -138,9 +139,13 @@ try { incRPCcallsMetrics(); ScanRequest request = - RequestConverter.buildScanRequest(scannerId, caching, false); + RequestConverter.buildScanRequest(scannerId, caching, false, callSeq); try { ScanResponse response = server.scan(null, request); + // increment the callSeq which will be getting used for the next time scan() call to + // the RS.In case of a timeout this increment should not happen so that the next + // trial also will be done with the same callSeq. + callSeq++; long timestamp = System.currentTimeMillis(); rrs = ResponseConverter.getResults(response); if (logScannerActivity) { Index: hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java (revision 1345869) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java (working copy) @@ -10305,6 +10305,10 @@ // optional bool closeScanner = 5; boolean hasCloseScanner(); boolean getCloseScanner(); + + // optional uint64 callSeq = 6; + boolean hasCallSeq(); + long getCallSeq(); } public static final class ScanRequest extends com.google.protobuf.GeneratedMessage @@ -10391,12 +10395,23 @@ return closeScanner_; } + // optional uint64 callSeq = 6; + public static final int CALLSEQ_FIELD_NUMBER = 6; + private long callSeq_; + public boolean hasCallSeq() { + return ((bitField0_ & 0x00000020) == 0x00000020); + } + public long getCallSeq() { + return callSeq_; + } + private void initFields() { region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance(); scan_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.getDefaultInstance(); scannerId_ = 0L; numberOfRows_ = 0; closeScanner_ = false; + callSeq_ = 0L; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -10437,6 +10452,9 @@ if (((bitField0_ & 0x00000010) == 0x00000010)) { output.writeBool(5, closeScanner_); } + if (((bitField0_ & 0x00000020) == 0x00000020)) { + output.writeUInt64(6, callSeq_); + } getUnknownFields().writeTo(output); } @@ -10466,6 +10484,10 @@ size += com.google.protobuf.CodedOutputStream .computeBoolSize(5, closeScanner_); } + if (((bitField0_ & 0x00000020) == 0x00000020)) { + size += com.google.protobuf.CodedOutputStream + .computeUInt64Size(6, callSeq_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -10514,6 +10536,11 @@ result = result && (getCloseScanner() == other.getCloseScanner()); } + result = result && (hasCallSeq() == other.hasCallSeq()); + if (hasCallSeq()) { + result = result && (getCallSeq() + == other.getCallSeq()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -10543,6 +10570,10 @@ hash = (37 * hash) + CLOSESCANNER_FIELD_NUMBER; hash = (53 * hash) + hashBoolean(getCloseScanner()); } + if (hasCallSeq()) { + hash = (37 * hash) + CALLSEQ_FIELD_NUMBER; + hash = (53 * hash) + hashLong(getCallSeq()); + } hash = (29 * hash) + getUnknownFields().hashCode(); return hash; } @@ -10679,6 +10710,8 @@ bitField0_ = (bitField0_ & ~0x00000008); closeScanner_ = false; bitField0_ = (bitField0_ & ~0x00000010); + callSeq_ = 0L; + bitField0_ = (bitField0_ & ~0x00000020); return this; } @@ -10745,6 +10778,10 @@ to_bitField0_ |= 0x00000010; } result.closeScanner_ = closeScanner_; + if (((from_bitField0_ & 0x00000020) == 0x00000020)) { + to_bitField0_ |= 0x00000020; + } + result.callSeq_ = callSeq_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -10776,6 +10813,9 @@ if (other.hasCloseScanner()) { setCloseScanner(other.getCloseScanner()); } + if (other.hasCallSeq()) { + setCallSeq(other.getCallSeq()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -10852,6 +10892,11 @@ closeScanner_ = input.readBool(); break; } + case 48: { + bitField0_ |= 0x00000020; + callSeq_ = input.readUInt64(); + break; + } } } } @@ -11101,6 +11146,27 @@ return this; } + // optional uint64 callSeq = 6; + private long callSeq_ ; + public boolean hasCallSeq() { + return ((bitField0_ & 0x00000020) == 0x00000020); + } + public long getCallSeq() { + return callSeq_; + } + public Builder setCallSeq(long value) { + bitField0_ |= 0x00000020; + callSeq_ = value; + onChanged(); + return this; + } + public Builder clearCallSeq() { + bitField0_ = (bitField0_ & ~0x00000020); + callSeq_ = 0L; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:ScanRequest) } @@ -21428,49 +21494,49 @@ "\ttimeRange\030\006 \001(\0132\n.TimeRange\022\026\n\013maxVersi" + "ons\030\007 \001(\r:\0011\022\031\n\013cacheBlocks\030\010 \001(\010:\004true\022" + "\021\n\tbatchSize\030\t \001(\r\022\025\n\rmaxResultSize\030\n \001(" + - "\004\"\203\001\n\013ScanRequest\022 \n\006region\030\001 \001(\0132\020.Regi" + + "\004\"\224\001\n\013ScanRequest\022 \n\006region\030\001 \001(\0132\020.Regi" + "onSpecifier\022\023\n\004scan\030\002 \001(\0132\005.Scan\022\021\n\tscan" + "nerId\030\003 \001(\004\022\024\n\014numberOfRows\030\004 \001(\r\022\024\n\014clo" + - "seScanner\030\005 \001(\010\"\\\n\014ScanResponse\022\027\n\006resul" + - "t\030\001 \003(\0132\007.Result\022\021\n\tscannerId\030\002 \001(\004\022\023\n\013m" + - "oreResults\030\003 \001(\010\022\013\n\003ttl\030\004 \001(\r\"?\n\016LockRow", - "Request\022 \n\006region\030\001 \002(\0132\020.RegionSpecifie" + - "r\022\013\n\003row\030\002 \003(\014\".\n\017LockRowResponse\022\016\n\006loc" + - "kId\030\001 \002(\004\022\013\n\003ttl\030\002 \001(\r\"D\n\020UnlockRowReque" + - "st\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022\016\n\006" + - "lockId\030\002 \002(\004\"\023\n\021UnlockRowResponse\"\232\001\n\024Bu" + - "lkLoadHFileRequest\022 \n\006region\030\001 \002(\0132\020.Reg" + - "ionSpecifier\0224\n\nfamilyPath\030\002 \003(\0132 .BulkL" + - "oadHFileRequest.FamilyPath\032*\n\nFamilyPath" + - "\022\016\n\006family\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLo" + - "adHFileResponse\022\016\n\006loaded\030\001 \002(\010\"\203\001\n\004Exec", - "\022\013\n\003row\030\001 \002(\014\022\024\n\014protocolName\030\002 \002(\t\022\022\n\nm" + - "ethodName\030\003 \002(\t\022!\n\010property\030\004 \003(\0132\017.Name" + - "StringPair\022!\n\tparameter\030\005 \003(\0132\016.NameByte" + - "sPair\"O\n\026ExecCoprocessorRequest\022 \n\006regio" + - "n\030\001 \002(\0132\020.RegionSpecifier\022\023\n\004call\030\002 \002(\0132" + - "\005.Exec\"8\n\027ExecCoprocessorResponse\022\035\n\005val" + - "ue\030\001 \002(\0132\016.NameBytesPair\"N\n\013MultiAction\022" + - "\027\n\006mutate\030\001 \001(\0132\007.Mutate\022\021\n\003get\030\002 \001(\0132\004." + - "Get\022\023\n\004exec\030\003 \001(\0132\005.Exec\"P\n\014ActionResult" + - "\022\035\n\005value\030\001 \001(\0132\016.NameBytesPair\022!\n\texcep", - "tion\030\002 \001(\0132\016.NameBytesPair\"^\n\014MultiReque" + - "st\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022\034\n\006" + - "action\030\002 \003(\0132\014.MultiAction\022\016\n\006atomic\030\003 \001" + - "(\010\".\n\rMultiResponse\022\035\n\006result\030\001 \003(\0132\r.Ac" + - "tionResult2\221\003\n\rClientService\022 \n\003get\022\013.Ge" + - "tRequest\032\014.GetResponse\022)\n\006mutate\022\016.Mutat" + - "eRequest\032\017.MutateResponse\022#\n\004scan\022\014.Scan" + - "Request\032\r.ScanResponse\022,\n\007lockRow\022\017.Lock" + - "RowRequest\032\020.LockRowResponse\0222\n\tunlockRo" + - "w\022\021.UnlockRowRequest\032\022.UnlockRowResponse", - "\022>\n\rbulkLoadHFile\022\025.BulkLoadHFileRequest" + - "\032\026.BulkLoadHFileResponse\022D\n\017execCoproces" + - "sor\022\027.ExecCoprocessorRequest\032\030.ExecCopro" + - "cessorResponse\022&\n\005multi\022\r.MultiRequest\032\016" + - ".MultiResponseBB\n*org.apache.hadoop.hbas" + - "e.protobuf.generatedB\014ClientProtosH\001\210\001\001\240" + - "\001\001" + "seScanner\030\005 \001(\010\022\017\n\007callSeq\030\006 \001(\004\"\\\n\014Scan" + + "Response\022\027\n\006result\030\001 \003(\0132\007.Result\022\021\n\tsca" + + "nnerId\030\002 \001(\004\022\023\n\013moreResults\030\003 \001(\010\022\013\n\003ttl", + "\030\004 \001(\r\"?\n\016LockRowRequest\022 \n\006region\030\001 \002(\013" + + "2\020.RegionSpecifier\022\013\n\003row\030\002 \003(\014\".\n\017LockR" + + "owResponse\022\016\n\006lockId\030\001 \002(\004\022\013\n\003ttl\030\002 \001(\r\"" + + "D\n\020UnlockRowRequest\022 \n\006region\030\001 \002(\0132\020.Re" + + "gionSpecifier\022\016\n\006lockId\030\002 \002(\004\"\023\n\021UnlockR" + + "owResponse\"\232\001\n\024BulkLoadHFileRequest\022 \n\006r" + + "egion\030\001 \002(\0132\020.RegionSpecifier\0224\n\nfamilyP" + + "ath\030\002 \003(\0132 .BulkLoadHFileRequest.FamilyP" + + "ath\032*\n\nFamilyPath\022\016\n\006family\030\001 \002(\014\022\014\n\004pat" + + "h\030\002 \002(\t\"\'\n\025BulkLoadHFileResponse\022\016\n\006load", + "ed\030\001 \002(\010\"\203\001\n\004Exec\022\013\n\003row\030\001 \002(\014\022\024\n\014protoc" + + "olName\030\002 \002(\t\022\022\n\nmethodName\030\003 \002(\t\022!\n\010prop" + + "erty\030\004 \003(\0132\017.NameStringPair\022!\n\tparameter" + + "\030\005 \003(\0132\016.NameBytesPair\"O\n\026ExecCoprocesso" + + "rRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpecifi" + + "er\022\023\n\004call\030\002 \002(\0132\005.Exec\"8\n\027ExecCoprocess" + + "orResponse\022\035\n\005value\030\001 \002(\0132\016.NameBytesPai" + + "r\"N\n\013MultiAction\022\027\n\006mutate\030\001 \001(\0132\007.Mutat" + + "e\022\021\n\003get\030\002 \001(\0132\004.Get\022\023\n\004exec\030\003 \001(\0132\005.Exe" + + "c\"P\n\014ActionResult\022\035\n\005value\030\001 \001(\0132\016.NameB", + "ytesPair\022!\n\texception\030\002 \001(\0132\016.NameBytesP" + + "air\"^\n\014MultiRequest\022 \n\006region\030\001 \002(\0132\020.Re" + + "gionSpecifier\022\034\n\006action\030\002 \003(\0132\014.MultiAct" + + "ion\022\016\n\006atomic\030\003 \001(\010\".\n\rMultiResponse\022\035\n\006" + + "result\030\001 \003(\0132\r.ActionResult2\221\003\n\rClientSe" + + "rvice\022 \n\003get\022\013.GetRequest\032\014.GetResponse\022" + + ")\n\006mutate\022\016.MutateRequest\032\017.MutateRespon" + + "se\022#\n\004scan\022\014.ScanRequest\032\r.ScanResponse\022" + + ",\n\007lockRow\022\017.LockRowRequest\032\020.LockRowRes" + + "ponse\0222\n\tunlockRow\022\021.UnlockRowRequest\032\022.", + "UnlockRowResponse\022>\n\rbulkLoadHFile\022\025.Bul" + + "kLoadHFileRequest\032\026.BulkLoadHFileRespons" + + "e\022D\n\017execCoprocessor\022\027.ExecCoprocessorRe" + + "quest\032\030.ExecCoprocessorResponse\022&\n\005multi" + + "\022\r.MultiRequest\032\016.MultiResponseBB\n*org.a" + + "pache.hadoop.hbase.protobuf.generatedB\014C" + + "lientProtosH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -21578,7 +21644,7 @@ internal_static_ScanRequest_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_ScanRequest_descriptor, - new java.lang.String[] { "Region", "Scan", "ScannerId", "NumberOfRows", "CloseScanner", }, + new java.lang.String[] { "Region", "Scan", "ScannerId", "NumberOfRows", "CloseScanner", "CallSeq", }, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest.class, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest.Builder.class); internal_static_ScanResponse_descriptor = Index: hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java (revision 1345869) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java (working copy) @@ -409,6 +409,25 @@ builder.setScannerId(scannerId); return builder.build(); } + + /** + * Create a protocol buffer ScanRequest for a scanner id + * + * @param scannerId + * @param numberOfRows + * @param closeScanner + * @param callSeq + * @return a scan request + */ + public static ScanRequest buildScanRequest(final long scannerId, + final int numberOfRows, final boolean closeScanner, final long callSeq) { + ScanRequest.Builder builder = ScanRequest.newBuilder(); + builder.setNumberOfRows(numberOfRows); + builder.setCloseScanner(closeScanner); + builder.setScannerId(scannerId); + builder.setCallSeq(callSeq); + return builder.build(); + } /** * Create a protocol buffer LockRowRequest Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1345869) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -66,6 +66,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants.OperationStatusCode; +import org.apache.hadoop.hbase.CallSequenceOutOfOrderException; import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; @@ -271,8 +272,8 @@ // Compactions public CompactSplitThread compactSplitThread; - final Map scanners = - new ConcurrentHashMap(); + final Map scanners = + new ConcurrentHashMap(); /** * Map of regions currently being served by this region server. Key is the @@ -576,8 +577,8 @@ return NORMAL_QOS; // doh. } String scannerIdString = Long.toString(scannerId); - RegionScanner scanner = scanners.get(scannerIdString); - if (scanner != null && scanner.getRegionInfo().isMetaRegion()) { + RegionScannerHolder scannerHolder = scanners.get(scannerIdString); + if (scannerHolder != null && scannerHolder.s.getRegionInfo().isMetaRegion()) { // LOG.debug("High priority scanner request: " + scannerId); return HIGH_QOS; } @@ -1007,9 +1008,9 @@ private void closeAllScanners() { // Close any outstanding scanners. Means they'll get an UnknownScanner // exception next time they come in. - for (Map.Entry e : this.scanners.entrySet()) { + for (Map.Entry e : this.scanners.entrySet()) { try { - e.getValue().close(); + e.getValue().s.close(); } catch (IOException ioe) { LOG.warn("Closing scanner " + e.getKey(), ioe); } @@ -2362,8 +2363,9 @@ } public void leaseExpired() { - RegionScanner s = scanners.remove(this.scannerName); - if (s != null) { + RegionScannerHolder rsh = scanners.remove(this.scannerName); + if (rsh != null) { + RegionScanner s = rsh.s; LOG.info("Scanner " + this.scannerName + " lease expired on region " + s.getRegionInfo().getRegionNameAsString()); try { @@ -2662,7 +2664,7 @@ protected long addScanner(RegionScanner s) throws LeaseStillHeldException { long scannerId = nextLong(); String scannerName = String.valueOf(scannerId); - scanners.put(scannerName, s); + scanners.put(scannerName, new RegionScannerHolder(s)); this.leases.createLease(scannerName, new ScannerListener(scannerName)); return scannerId; } @@ -2889,6 +2891,7 @@ int ttl = 0; HRegion region = null; RegionScanner scanner = null; + RegionScannerHolder rsh = null; boolean moreResults = true; boolean closeScanner = false; ScanResponse.Builder builder = ScanResponse.newBuilder(); @@ -2900,11 +2903,12 @@ rows = request.getNumberOfRows(); } if (request.hasScannerId()) { - scanner = scanners.get(scannerName); - if (scanner == null) { + rsh = scanners.get(scannerName); + if (rsh == null) { throw new UnknownScannerException( "Name: " + scannerName + ", already closed?"); } + scanner = rsh.s; region = getRegion(scanner.getRegionInfo().getRegionName()); } else { region = getRegion(request.getRegion()); @@ -2926,6 +2930,19 @@ } if (rows > 0) { + // if callSeq does not match throw Exception straight away. This needs to be performed + // even before checking of Lease. + if (rsh == null) { + rsh = scanners.get(scannerName); + } + if (rsh != null) { + if (request.getCallSeq() != rsh.callSeq) { + throw new CallSequenceOutOfOrderException("Expected seq: " + rsh.callSeq + + " But the seq got from client: " + request.getCallSeq()); + } + // Increment the callSeq value which is the next expected from client. + rsh.callSeq++; + } try { // Remove lease while its being processed in server; protects against case // where processing of request takes > lease expiration time. @@ -3009,8 +3026,9 @@ return builder.build(); // bypass } } - scanner = scanners.remove(scannerName); - if (scanner != null) { + rsh = scanners.remove(scannerName); + if (rsh != null) { + scanner = rsh.s; scanner.close(); leases.cancelLease(scannerName); if (region != null && region.getCoprocessorHost() != null) { @@ -3881,4 +3899,16 @@ private String getMyEphemeralNodePath() { return ZKUtil.joinZNode(this.zooKeeper.rsZNode, getServerName().toString()); } + + /** + * Holder class which holds the RegionScanner and callSequence together. + */ + private static class RegionScannerHolder { + private RegionScanner s; + private long callSeq = 0L; + + public RegionScannerHolder(RegionScanner s) { + this.s = s; + } + } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java (revision 1345869) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java (working copy) @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.util; import java.io.IOException; +import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.util.List; @@ -83,7 +84,9 @@ throws IOException { HRegionServer server; try { - server = hrsc.getConstructor(Configuration.class).newInstance(c); + Constructor ctor = hrsc.getConstructor(Configuration.class); + ctor.setAccessible(true); + server = ctor.newInstance(c); } catch (InvocationTargetException ite) { Throwable target = ite.getTargetException(); throw new RuntimeException("Failed construction of RegionServer: " + Index: hbase-server/src/protobuf/Client.proto =================================================================== --- hbase-server/src/protobuf/Client.proto (revision 1345869) +++ hbase-server/src/protobuf/Client.proto (working copy) @@ -214,6 +214,7 @@ optional uint64 scannerId = 3; optional uint32 numberOfRows = 4; optional bool closeScanner = 5; + optional uint64 callSeq = 6; } /** Index: hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java (revision 0) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java (revision 0) @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertNotNull; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.MiniHBaseCluster.MiniHBaseClusterRegionServer; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + +/** + * Test the scenario where a scan() call, while scanning, timeout at client side and getting + * retried. This scenario should not result in some data being skipped at RS side. + */ +@Category(MediumTests.class) +public class TestClientScannerRPCTimeout { + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final byte[] FAMILY = Bytes.toBytes("testFamily"); + private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier"); + private static final byte[] VALUE = Bytes.toBytes("testValue"); + private static final int SLAVES = 1; + private static final int rpcTimeout = 5 * 1000; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout); + conf.setStrings(HConstants.REGION_SERVER_IMPL, RegionServerWithScanTimeout.class.getName()); + TEST_UTIL.startMiniCluster(SLAVES); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testScannerNextRPCTimesout() throws Exception { + byte[] TABLE = Bytes.toBytes("TestClientScannerRPCTimeout"); + HTable ht = TEST_UTIL.createTable(TABLE, FAMILY); + putToTable(ht, "row-1"); + putToTable(ht, "row-2"); + RegionServerWithScanTimeout.seqNoToSleepOn = 1; + Scan scan = new Scan(); + scan.setCaching(1); + ResultScanner scanner = ht.getScanner(scan); + Result result = scanner.next(); + assertNotNull("Expected not null result", result); + result = scanner.next(); + assertNotNull("Expected not null result", result); + scanner.close(); + } + + private void putToTable(HTable ht, String rowkey) throws IOException { + Put put = new Put(rowkey.getBytes()); + put.add(FAMILY, QUALIFIER, VALUE); + ht.put(put); + } + + private static class RegionServerWithScanTimeout extends MiniHBaseClusterRegionServer { + private long tableScannerId; + private boolean slept; + private static long seqNoToSleepOn = -1; + + public RegionServerWithScanTimeout(Configuration conf) throws IOException, InterruptedException { + super(conf); + } + + @Override + public ScanResponse scan(final RpcController controller, final ScanRequest request) + throws ServiceException { + if (request.hasScannerId()) { + if (!slept && this.tableScannerId == request.getScannerId() + && seqNoToSleepOn == request.getCallSeq()) { + try { + Thread.sleep(rpcTimeout + 500); + } catch (InterruptedException e) { + } + slept = true; + } + return super.scan(controller, request); + } else { + ScanResponse scanRes = super.scan(controller, request); + String regionName = Bytes.toString(request.getRegion().getValue().toByteArray()); + if (!regionName.contains("-ROOT-") && !regionName.contains(".META.")) { + tableScannerId = scanRes.getScannerId(); + } + return scanRes; + } + } + } +}