Index: src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java (revision 0) @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertNotNull; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.MiniHBaseCluster.MiniHBaseClusterRegionServer; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Test the scenario where a next() call, while scanning, timeout at client side and getting retried. + * This scenario should not result in some data being skipped at RS side. + */ +@Category(MediumTests.class) +public class TestClientScannerRPCTimeout { + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final byte[] FAMILY = Bytes.toBytes("testFamily"); + private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier"); + private static final byte[] VALUE = Bytes.toBytes("testValue"); + private static final int SLAVES = 1; + private static final int rpcTimeout = 5 * 1000; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout); + conf.setStrings(HConstants.REGION_SERVER_IMPL, RegionServerWithScanTimeout.class.getName()); + TEST_UTIL.startMiniCluster(SLAVES); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testScannerNextRPCTimesout() throws Exception { + byte[] TABLE = Bytes.toBytes("testScannerNextRPCTimesout"); + HTable ht = TEST_UTIL.createTable(TABLE, FAMILY); + putToTable(ht, "row-1"); + putToTable(ht, "row-2"); + RegionServerWithScanTimeout.seqNoToSleepOn = 1; + Scan scan = new Scan(); + scan.setCaching(1); + ResultScanner scanner = ht.getScanner(scan); + Result result = scanner.next(); + assertNotNull("Expected not null result", result); + result = scanner.next(); + assertNotNull("Expected not null result", result); + scanner.close(); + } + + private void putToTable(HTable ht, String rowkey) throws IOException { + Put put = new Put(rowkey.getBytes()); + put.add(FAMILY, QUALIFIER, VALUE); + ht.put(put); + } + + private static class RegionServerWithScanTimeout extends MiniHBaseClusterRegionServer { + private long tableScannerId; + private boolean slept; + private static long seqNoToSleepOn = -1; + + public RegionServerWithScanTimeout(Configuration conf) throws IOException, + InterruptedException { + super(conf); + } + + @Override + public long openScanner(byte[] regionName, Scan scan) throws IOException { + long scannerId = super.openScanner(regionName, scan); + if (!getRegionInfo(regionName).isMetaTable()) { + tableScannerId = scannerId; + } + return scannerId; + } + + @Override + public Result[] next(long scannerId, int nbRows, long callSeq) throws IOException { + if (!slept && this.tableScannerId == scannerId && seqNoToSleepOn == callSeq) { + try { + Thread.sleep(rpcTimeout + 500); + } catch (InterruptedException e) { + } + slept = true; + } + return super.next(scannerId, nbRows, callSeq); + } + } +} Index: src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java (revision 1345709) +++ src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java (working copy) @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.util; import java.io.IOException; +import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.util.List; @@ -81,7 +82,9 @@ throws IOException { HRegionServer server; try { - server = hrsc.getConstructor(Configuration.class).newInstance(c); + Constructor ctor = hrsc.getConstructor(Configuration.class); + ctor.setAccessible(true); + server = ctor.newInstance(c); } catch (InvocationTargetException ite) { Throwable target = ite.getTargetException(); throw new RuntimeException("Failed construction of RegionServer: " + Index: src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java (revision 1345709) +++ src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java (working copy) @@ -21,10 +21,12 @@ import java.io.IOException; import java.net.UnknownHostException; +import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; +import org.apache.hadoop.hbase.CallSequenceOutOfOrderException; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; @@ -55,6 +57,8 @@ // indicate if it is a remote server call private boolean isRegionServerRemote = true; + private long callSeq = 0; + private boolean useCallSeq = true; /** * @param connection which connection @@ -123,7 +127,33 @@ try { incRPCcallsMetrics(); long timestamp = System.currentTimeMillis(); - rrs = server.next(scannerId, caching); + if (useCallSeq) { + try { + rrs = server.next(scannerId, caching, callSeq); + // increment the callSeq which will be getting used for the next time next() call to + // the RS.In case of a timeout this increment should not happen so that the next + // trial also will be done with the same callSeq. + callSeq++; + } catch (IOException ioe) { + // TODO This is an ugly way of checking. Any other ways? + if (ioe instanceof RemoteException + && ExceptionUtils.getStackTrace(ioe).contains("java.lang.NoSuchMethodException")) { + // This will happen when we use a latest version of the client but still running with + // old region server. At server side there is no implementation for the seq number + // based scanning. Set the useCallSeq to false. + LOG.warn("Seq number based scan API not present at RS side! Trying with API: " + + "next(scannerId, caching). Consider upgrading version at RS " + + location.getHostnamePort()); + useCallSeq = false; + rrs = server.next(scannerId, caching); + } else { + // Throw it back so that will get handled by the below original catch blocks; + throw ioe; + } + } + } else { + rrs = server.next(scannerId, caching); + } if (logScannerActivity) { long now = System.currentTimeMillis(); if (now - timestamp > logCutOffLatency) { Index: src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java (revision 1345709) +++ src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java (working copy) @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.CallSequenceOutOfOrderException; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; @@ -292,8 +293,9 @@ } } else { Throwable cause = e.getCause(); - if (cause == null || (!(cause instanceof NotServingRegionException) - && !(cause instanceof RegionServerStoppedException))) { + if ((cause == null || (!(cause instanceof NotServingRegionException) + && !(cause instanceof RegionServerStoppedException))) + && !(e instanceof CallSequenceOutOfOrderException)) { throw e; } } Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1345709) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -58,6 +58,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.CallSequenceOutOfOrderException; import org.apache.hadoop.hbase.Chore; import org.apache.hadoop.hbase.ClockOutOfSyncException; import org.apache.hadoop.hbase.DoNotRetryIOException; @@ -279,8 +280,8 @@ // flag set after we're done setting up server threads (used for testing) protected volatile boolean isOnline; - final Map scanners = - new ConcurrentHashMap(); + final Map scanners = + new ConcurrentHashMap(); // zookeeper connection and watcher private ZooKeeperWatcher zooKeeper; @@ -506,8 +507,8 @@ return NORMAL_QOS; // doh. } String scannerIdString = Long.toString(scannerId); - RegionScanner scanner = scanners.get(scannerIdString); - if (scanner != null && scanner.getRegionInfo().isMetaRegion()) { + RegionScannerHolder scannerHolder = scanners.get(scannerIdString); + if (scannerHolder != null && scannerHolder.s.getRegionInfo().isMetaRegion()) { // LOG.debug("High priority scanner request: " + scannerId); return HIGH_QOS; } @@ -919,9 +920,9 @@ private void closeAllScanners() { // Close any outstanding scanners. Means they'll get an UnknownScanner // exception next time they come in. - for (Map.Entry e : this.scanners.entrySet()) { + for (Map.Entry e : this.scanners.entrySet()) { try { - e.getValue().close(); + e.getValue().s.close(); } catch (IOException ioe) { LOG.warn("Closing scanner " + e.getKey(), ioe); } @@ -2336,7 +2337,7 @@ long scannerId = -1L; scannerId = rand.nextLong(); String scannerName = String.valueOf(scannerId); - scanners.put(scannerName, s); + scanners.put(scannerName, new RegionScannerHolder(s)); this.leases.createLease(scannerName, new ScannerListener(scannerName)); return scannerId; } @@ -2348,11 +2349,16 @@ } return res[0]; } + + public Result[] next(final long scannerId, int nbRows) throws IOException { + return next(scannerId, nbRows, -1); + } - public Result[] next(final long scannerId, int nbRows) throws IOException { + public Result[] next(final long scannerId, int nbRows, long callSeq) throws IOException { String scannerName = String.valueOf(scannerId); - RegionScanner s = this.scanners.get(scannerName); - if (s == null) throw new UnknownScannerException("Name: " + scannerName); + RegionScannerHolder rsh = this.scanners.get(scannerName); + if (rsh == null) throw new UnknownScannerException("Name: " + scannerName); + RegionScanner s = rsh.s; try { checkOpen(); } catch (IOException e) { @@ -2366,6 +2372,17 @@ } throw e; } + // if callSeq does not match throw Exception straight away. This needs to be performed even + // before checking of Lease. + // Old next() APIs which do not take callSeq will pass it as -1 and for that no + // need to match the callSeq from client and the one in server. + if (callSeq != -1 && callSeq != rsh.callSeq) { + throw new CallSequenceOutOfOrderException("Expected seq: " + rsh.callSeq + + " But the seq got from client: " + callSeq); + } + // Increment the callSeq value which is the next expected from client. + rsh.callSeq++; + Leases.Lease lease = null; try { // Remove lease while its being processed in server; protects against case @@ -2443,26 +2460,26 @@ checkOpen(); requestCount.incrementAndGet(); String scannerName = String.valueOf(scannerId); - RegionScanner s = scanners.get(scannerName); + RegionScannerHolder rsh = scanners.get(scannerName); HRegion region = null; - if (s != null) { + if (rsh != null) { // call coprocessor. - region = getRegion(s.getRegionInfo().getRegionName()); + region = getRegion(rsh.s.getRegionInfo().getRegionName()); if (region != null && region.getCoprocessorHost() != null) { - if (region.getCoprocessorHost().preScannerClose(s)) { + if (region.getCoprocessorHost().preScannerClose(rsh.s)) { return; // bypass } } } - s = scanners.remove(scannerName); - if (s != null) { - s.close(); + rsh = scanners.remove(scannerName); + if (rsh != null) { + rsh.s.close(); this.leases.cancelLease(scannerName); if (region != null && region.getCoprocessorHost() != null) { - region.getCoprocessorHost().postScannerClose(s); + region.getCoprocessorHost().postScannerClose(rsh.s); } } } catch (Throwable t) { @@ -2482,8 +2499,9 @@ } public void leaseExpired() { - RegionScanner s = scanners.remove(this.scannerName); - if (s != null) { + RegionScannerHolder rsh = scanners.remove(this.scannerName); + if (rsh != null) { + RegionScanner s = rsh.s; LOG.info("Scanner " + this.scannerName + " lease expired on region " + s.getRegionInfo().getRegionNameAsString()); try { @@ -3714,4 +3732,16 @@ HRegionInfo info = region.getRegionInfo(); return CompactionRequest.getCompactionState(info.getRegionId()).name(); } + + /** + * Holder class which holds the RegionScanner and callSequence together. + */ + private static class RegionScannerHolder { + private RegionScanner s; + private long callSeq = 0L; + + public RegionScannerHolder(RegionScanner s) { + this.s = s; + } + } } Index: src/main/java/org/apache/hadoop/hbase/CallSequenceOutOfOrderException.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/CallSequenceOutOfOrderException.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/CallSequenceOutOfOrderException.java (revision 0) @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +/** + * Thrown by a region server while doing scan related next() calls. Both client and server maintain a + * callSequence and if they do not match, RS will throw this exception. + */ +public class CallSequenceOutOfOrderException extends DoNotRetryIOException { + + private static final long serialVersionUID = 1565946556907760065L; + + public CallSequenceOutOfOrderException() { + super(); + } + + public CallSequenceOutOfOrderException(String msg) { + super(msg); + } +} Index: src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (revision 1345709) +++ src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (working copy) @@ -328,6 +328,18 @@ public Result [] next(long scannerId, int numberOfRows) throws IOException; /** + * Get the next set of values + * @param scannerId clientId passed to openScanner + * @param numberOfRows the number of rows to fetch + * @param callSeq the number which represents the sequence used by client scanner + * @return Array of Results (map of values); array is empty if done with this + * region and null if we are NOT to go to the next region (happens when a + * filter rules that the scan is done). + * @throws IOException e + */ + public Result[] next(long scannerId, int caching, long callSeq) throws IOException; + + /** * Close a scanner * * @param scannerId the scanner id returned by openScanner