Index: src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java (revision 1338283) +++ src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java (working copy) @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.ScannerCookieOutOfOrderException; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; @@ -293,7 +294,8 @@ } else { Throwable cause = e.getCause(); if (cause == null || (!(cause instanceof NotServingRegionException) - && !(cause instanceof RegionServerStoppedException))) { + && !(cause instanceof RegionServerStoppedException) + && !(cause instanceof ScannerCookieOutOfOrderException))) { throw e; } } Index: src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java (revision 1338283) +++ src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java (working copy) @@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.RemoteExceptionHandler; +import org.apache.hadoop.hbase.ScannerCookieOutOfOrderException; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.DNS; @@ -47,7 +48,8 @@ // indicate if it is a remote server call private boolean isRegionServerRemote = true; - + private long cookie = 0; + /** * @param connection which connection * @param tableName table callable is on @@ -111,7 +113,11 @@ Result [] rrs = null; try { incRPCcallsMetrics(); - rrs = server.next(scannerId, caching); + rrs = server.next(scannerId, caching,cookie); + // increment the cookie which will get used for the next time next() call to the RS. + // In case of a timeout this increment should not happen so that the next trial also + // will be done with the same cookie value. + cookie++; updateResultsMetrics(rrs); } catch (IOException e) { IOException ioe = null; @@ -132,6 +138,11 @@ // when what we need is to open scanner against new location. // Attach RSSE to signal client that it needs to resetup scanner. throw new DoNotRetryIOException("Reset scanner", ioe); + } else if (ioe instanceof ScannerCookieOutOfOrderException) { + // The cookie from the client do not match with the one expected at the RS side + // This means the RS has done extra scanning of data which is not received by the client. + // Throw a DNRE so that we close the current scanner and opens a new one with RS. + throw new DoNotRetryIOException("Reset scanner", ioe); } else { // The outer layers will retry throw ioe; Index: src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (revision 1338283) +++ src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (working copy) @@ -327,6 +327,18 @@ public Result [] next(long scannerId, int numberOfRows) throws IOException; /** + * Get the next set of values + * @param scannerId clientId passed to openScanner + * @param numberOfRows the number of rows to fetch + * @param cookie the cookie which represents the sequence number used by client scanner + * @return Array of Results (map of values); array is empty if done with this + * region and null if we are NOT to go to the next region (happens when a + * filter rules that the scan is done). + * @throws IOException e + */ + public Result[] next(long scannerId, int caching, long cookie) throws IOException; + + /** * Close a scanner * * @param scannerId the scanner id returned by openScanner Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1338283) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -74,6 +74,7 @@ import org.apache.hadoop.hbase.MasterAddressTracker; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.RemoteExceptionHandler; +import org.apache.hadoop.hbase.ScannerCookieOutOfOrderException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.TableDescriptors; @@ -278,9 +279,9 @@ // flag set after we're done setting up server threads (used for testing) protected volatile boolean isOnline; - final Map scanners = - new ConcurrentHashMap(); - + final Map scanners = + new ConcurrentHashMap(); + // zookeeper connection and watcher private ZooKeeperWatcher zooKeeper; @@ -505,7 +506,7 @@ return NORMAL_QOS; // doh. } String scannerIdString = Long.toString(scannerId); - RegionScanner scanner = scanners.get(scannerIdString); + RegionScanner scanner = scanners.get(scannerIdString).s; if (scanner != null && scanner.getRegionInfo().isMetaRegion()) { // LOG.debug("High priority scanner request: " + scannerId); return HIGH_QOS; @@ -918,9 +919,9 @@ private void closeAllScanners() { // Close any outstanding scanners. Means they'll get an UnknownScanner // exception next time they come in. - for (Map.Entry e : this.scanners.entrySet()) { + for (Map.Entry e : this.scanners.entrySet()) { try { - e.getValue().close(); + e.getValue().s.close(); } catch (IOException ioe) { LOG.warn("Closing scanner " + e.getKey(), ioe); } @@ -2335,7 +2336,8 @@ long scannerId = -1L; scannerId = rand.nextLong(); String scannerName = String.valueOf(scannerId); - scanners.put(scannerName, s); + RegionScannerWithCookie rswc = new RegionScannerWithCookie(s); + scanners.put(scannerName, rswc); this.leases.createLease(scannerName, new ScannerListener(scannerName)); return scannerId; } @@ -2349,9 +2351,14 @@ } public Result[] next(final long scannerId, int nbRows) throws IOException { + return next(scannerId, nbRows, -1); + } + + public Result[] next(final long scannerId, int nbRows, long cookie) throws IOException { String scannerName = String.valueOf(scannerId); - RegionScanner s = this.scanners.get(scannerName); - if (s == null) throw new UnknownScannerException("Name: " + scannerName); + RegionScannerWithCookie rswc = this.scanners.get(scannerName); + if (rswc == null) throw new UnknownScannerException("Name: " + scannerName); + RegionScanner s = rswc.s; try { checkOpen(); } catch (IOException e) { @@ -2365,6 +2372,18 @@ } throw e; } + // if cookie do not match throw Exception straight away. This needs to be performed even + // before checking of Lease. + // Old next() APIs which do not take cookie will pass a cookie value as -1 and for that no + // need to match the cookie from client and the one in server. + if (-1 != cookie && cookie != rswc.cookie) { + this.scanners.remove(scannerName); + this.leases.cancelLease(scannerName); + throw new ScannerCookieOutOfOrderException("Cookie from client and cookie at server do not match"); + } + // Increment the cookie value which is the next expected from client. + rswc.cookie++; Leases.Lease lease = null; try { // Remove lease while its being processed in server; protects against case @@ -2442,26 +2461,25 @@ checkOpen(); requestCount.incrementAndGet(); String scannerName = String.valueOf(scannerId); - RegionScanner s = scanners.get(scannerName); - + RegionScannerWithCookie rswc = scanners.get(scannerName); HRegion region = null; - if (s != null) { + if (rswc != null) { // call coprocessor. - region = getRegion(s.getRegionInfo().getRegionName()); + region = getRegion(rswc.s.getRegionInfo().getRegionName()); if (region != null && region.getCoprocessorHost() != null) { - if (region.getCoprocessorHost().preScannerClose(s)) { + if (region.getCoprocessorHost().preScannerClose(rswc.s)) { return; // bypass } } } - s = scanners.remove(scannerName); - if (s != null) { - s.close(); + rswc = scanners.remove(scannerName); + if (rswc != null) { + rswc.s.close(); this.leases.cancelLease(scannerName); if (region != null && region.getCoprocessorHost() != null) { - region.getCoprocessorHost().postScannerClose(s); + region.getCoprocessorHost().postScannerClose(rswc.s); } } } catch (Throwable t) { @@ -2481,8 +2499,9 @@ } public void leaseExpired() { - RegionScanner s = scanners.remove(this.scannerName); - if (s != null) { + RegionScannerWithCookie rswc = scanners.remove(this.scannerName); + if (rswc != null) { + RegionScanner s = rswc.s; LOG.info("Scanner " + this.scannerName + " lease expired on region " + s.getRegionInfo().getRegionNameAsString()); try { @@ -3696,4 +3715,16 @@ mxBeanInfo); LOG.info("Registered RegionServer MXBean"); } + + /** + * Holder class which holds the RegionScanner and cookie together. + */ + private static class RegionScannerWithCookie { + private RegionScanner s; + private long cookie = 0L; + + public RegionScannerWithCookie(RegionScanner s) { + this.s = s; + } + } } Index: src/main/java/org/apache/hadoop/hbase/ScannerCookieOutOfOrderException.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ScannerCookieOutOfOrderException.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/ScannerCookieOutOfOrderException.java (working copy) @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; + +/** + * Can be thrown by a region server while scan related next() calls. Both client and server + * maintain a cookie and if the both do not match, RS will throw this exception. + */ +public class ScannerCookieOutOfOrderException extends IOException { + + private static final long serialVersionUID = 1565946556907760065L; + + public ScannerCookieOutOfOrderException() { + super(); + } + + public ScannerCookieOutOfOrderException(String msg) { + super(msg); + } +} Index: src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimesout.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimesout.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimesout.java (working copy) @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertNotNull; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.MiniHBaseCluster.MiniHBaseClusterRegionServer; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(MediumTests.class) +public class TestClientScannerRPCTimesout { + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final byte[] FAMILY = Bytes.toBytes("testFamily"); + private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier"); + private static final byte[] VALUE = Bytes.toBytes("testValue"); + private static final int SLAVES = 1; + private static final int rpcTimeout = 5 * 1000; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout); + conf.setStrings(HConstants.REGION_SERVER_IMPL, RegionServerWithScanTimesout.class.getName()); + TEST_UTIL.startMiniCluster(SLAVES); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testScannerNextRPCTimesout() throws Exception { + byte[] TABLE = Bytes.toBytes("testScannerNextRPCTimesout"); + HTable ht = TEST_UTIL.createTable(TABLE, FAMILY); + putToTable(ht, "row-1"); + putToTable(ht, "row-2"); + RegionServerWithScanTimesout.cookieToSleepOn = 1; + Scan scan = new Scan(); + ResultScanner scanner = ht.getScanner(scan); + Result result = scanner.next(); + assertNotNull("Expected not null result", result); + result = scanner.next(); + assertNotNull("Expected not null result", result); + scanner.close(); + } + + private void putToTable(HTable ht, String rowkey) throws IOException { + Put put = new Put(rowkey.getBytes()); + put.add(FAMILY, QUALIFIER, VALUE); + ht.put(put); + } + + public static class RegionServerWithScanTimesout extends MiniHBaseClusterRegionServer { + private long tableScannerId; + private boolean slept; + private static long cookieToSleepOn = -1; + + public RegionServerWithScanTimesout(Configuration conf) throws IOException, + InterruptedException { + super(conf); + } + + @Override + public long openScanner(byte[] regionName, Scan scan) throws IOException { + long scannerId = super.openScanner(regionName, scan); + if (!getRegionInfo(regionName).isMetaTable()) { + tableScannerId = scannerId; + } + return scannerId; + } + + @Override + public Result[] next(long scannerId, int nbRows, long cookie) throws IOException { + if (this.tableScannerId == scannerId && cookieToSleepOn == cookie && !slept) { + try { + Thread.sleep(rpcTimeout + 500); + } catch (InterruptedException e) { + } + slept = true; + } + return super.next(scannerId, nbRows, cookie); + } + } +}