### Eclipse Workspace Patch 1.0 #P hbase 0.94 Index: src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (revision 1343207) +++ src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (working copy) @@ -327,6 +327,18 @@ public Result [] next(long scannerId, int numberOfRows) throws IOException; /** + * Get the next set of values + * @param scannerId clientId passed to openScanner + * @param numberOfRows the number of rows to fetch + * @param callSeq the number which represents the sequence used by client scanner + * @return Array of Results (map of values); array is empty if done with this + * region and null if we are NOT to go to the next region (happens when a + * filter rules that the scan is done). + * @throws IOException e + */ + public Result[] next(long scannerId, int caching, long callSeq) throws IOException; + + /** * Close a scanner * * @param scannerId the scanner id returned by openScanner Index: src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java (revision 1343207) +++ src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java (working copy) @@ -21,10 +21,12 @@ import java.io.IOException; import java.net.UnknownHostException; +import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; +import org.apache.hadoop.hbase.CallSequenceOutOfOrderException; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; @@ -55,6 +57,8 @@ // indicate if it is a remote server call private boolean isRegionServerRemote = true; + private long callSeq = 0; + private boolean useCallSeq = true; /** * @param connection which connection @@ -123,7 +127,33 @@ try { incRPCcallsMetrics(); long timestamp = System.currentTimeMillis(); - rrs = server.next(scannerId, caching); + if (useCallSeq) { + try { + rrs = server.next(scannerId, caching, callSeq); + // increment the callSeq which will be getting used for the next time next() call to + // the RS.In case of a timeout this increment should not happen so that the next + // trial also will be done with the same callSeq. + callSeq++; + } catch (IOException ioe) { + // TODO This is an ugly way of checking. Any other ways? + if (ioe instanceof RemoteException + && ExceptionUtils.getStackTrace(ioe).contains("java.lang.NoSuchMethodException")) { + // This will happen when we use a latest version of the client but still running with + // old region server. At server side there is no implementation for the seq number + // based scanning. Set the useCallSeq to false. + LOG.info("Seq number based scan API not present at RS side! Trying with API: " + + "next(scannerId, caching). Consider upgrading version at RS " + + location.getHostnamePort()); + useCallSeq = false; + rrs = server.next(scannerId, caching); + } else { + // Throw it back so that will get handled by the below original catch blocks; + throw ioe; + } + } + } else { + rrs = server.next(scannerId, caching); + } if (logScannerActivity) { long now = System.currentTimeMillis(); if (now - timestamp > logCutOffLatency) { @@ -167,6 +197,11 @@ // when what we need is to open scanner against new location. // Attach RSSE to signal client that it needs to resetup scanner. throw new DoNotRetryIOException("Reset scanner", ioe); + } else if (ioe instanceof CallSequenceOutOfOrderException) { + // The callSeq from the client not matched with the one expected at the RS side + // This means the RS might have done extra scanning of data which is not received by the + // client.Throw a DNRE so that we close the current scanner and opens a new one with RS. + throw new DoNotRetryIOException("Reset scanner", ioe); } else { // The outer layers will retry throw ioe; Index: src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java (revision 1343207) +++ src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java (working copy) @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.CallSequenceOutOfOrderException; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; @@ -293,7 +294,8 @@ } else { Throwable cause = e.getCause(); if (cause == null || (!(cause instanceof NotServingRegionException) - && !(cause instanceof RegionServerStoppedException))) { + && !(cause instanceof RegionServerStoppedException) + && !(cause instanceof CallSequenceOutOfOrderException))) { throw e; } } Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1343207) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -74,6 +74,7 @@ import org.apache.hadoop.hbase.MasterAddressTracker; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.RemoteExceptionHandler; +import org.apache.hadoop.hbase.CallSequenceOutOfOrderException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.TableDescriptors; @@ -278,9 +279,9 @@ // flag set after we're done setting up server threads (used for testing) protected volatile boolean isOnline; - final Map scanners = - new ConcurrentHashMap(); - + final Map scanners = + new ConcurrentHashMap(); + // zookeeper connection and watcher private ZooKeeperWatcher zooKeeper; @@ -505,8 +506,8 @@ return NORMAL_QOS; // doh. } String scannerIdString = Long.toString(scannerId); - RegionScanner scanner = scanners.get(scannerIdString); - if (scanner != null && scanner.getRegionInfo().isMetaRegion()) { + RegionScannerHolder scannerHolder = scanners.get(scannerIdString); + if (scannerHolder != null && scannerHolder.s.getRegionInfo().isMetaRegion()) { // LOG.debug("High priority scanner request: " + scannerId); return HIGH_QOS; } @@ -918,9 +919,9 @@ private void closeAllScanners() { // Close any outstanding scanners. Means they'll get an UnknownScanner // exception next time they come in. - for (Map.Entry e : this.scanners.entrySet()) { + for (Map.Entry e : this.scanners.entrySet()) { try { - e.getValue().close(); + e.getValue().s.close(); } catch (IOException ioe) { LOG.warn("Closing scanner " + e.getKey(), ioe); } @@ -2335,7 +2336,7 @@ long scannerId = -1L; scannerId = rand.nextLong(); String scannerName = String.valueOf(scannerId); - scanners.put(scannerName, s); + scanners.put(scannerName, new RegionScannerHolder(s)); this.leases.createLease(scannerName, new ScannerListener(scannerName)); return scannerId; } @@ -2349,9 +2350,14 @@ } public Result[] next(final long scannerId, int nbRows) throws IOException { + return next(scannerId, nbRows, -1); + } + + public Result[] next(final long scannerId, int nbRows ,long callSeq) throws IOException { String scannerName = String.valueOf(scannerId); - RegionScanner s = this.scanners.get(scannerName); - if (s == null) throw new UnknownScannerException("Name: " + scannerName); + RegionScannerHolder rsh = this.scanners.get(scannerName); + if (rsh == null) throw new UnknownScannerException("Name: " + scannerName); + RegionScanner s = rsh.s; try { checkOpen(); } catch (IOException e) { @@ -2365,6 +2371,16 @@ } throw e; } + // if callSeq do not match throw Exception straight away. This needs to be performed even + // before checking of Lease. + // Old next() APIs which do not take callSeq will pass it as -1 and for that no + // need to match the callSeq from client and the one in server. + if (callSeq != -1 && callSeq != rsh.callSeq) { + throw new CallSequenceOutOfOrderException("Expected seq: " + rsh.callSeq + + " But the seq got from client: " + callSeq); + } + // Increment the callSeq value which is the next expected from client. + rsh.callSeq++; Leases.Lease lease = null; try { // Remove lease while its being processed in server; protects against case @@ -2442,26 +2458,25 @@ checkOpen(); requestCount.incrementAndGet(); String scannerName = String.valueOf(scannerId); - RegionScanner s = scanners.get(scannerName); - + RegionScannerHolder rsh = scanners.get(scannerName); HRegion region = null; - if (s != null) { + if (rsh != null) { // call coprocessor. - region = getRegion(s.getRegionInfo().getRegionName()); + region = getRegion(rsh.s.getRegionInfo().getRegionName()); if (region != null && region.getCoprocessorHost() != null) { - if (region.getCoprocessorHost().preScannerClose(s)) { + if (region.getCoprocessorHost().preScannerClose(rsh.s)) { return; // bypass } } } - s = scanners.remove(scannerName); - if (s != null) { - s.close(); + rsh = scanners.remove(scannerName); + if (rsh != null) { + rsh.s.close(); this.leases.cancelLease(scannerName); if (region != null && region.getCoprocessorHost() != null) { - region.getCoprocessorHost().postScannerClose(s); + region.getCoprocessorHost().postScannerClose(rsh.s); } } } catch (Throwable t) { @@ -2481,8 +2496,9 @@ } public void leaseExpired() { - RegionScanner s = scanners.remove(this.scannerName); - if (s != null) { + RegionScannerHolder rsh = scanners.remove(this.scannerName); + if (rsh != null) { + RegionScanner s = rsh.s; LOG.info("Scanner " + this.scannerName + " lease expired on region " + s.getRegionInfo().getRegionNameAsString()); try { @@ -3698,4 +3714,16 @@ mxBeanInfo); LOG.info("Registered RegionServer MXBean"); } + + /** + * Holder class which holds the RegionScanner and callSequence together. + */ + private static class RegionScannerHolder { + private RegionScanner s; + private long callSeq = 0L; + + public RegionScannerHolder(RegionScanner s) { + this.s = s; + } + } } Index: src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimesout.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimesout.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimesout.java (revision 0) @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertNotNull; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.MiniHBaseCluster.MiniHBaseClusterRegionServer; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(MediumTests.class) +public class TestClientScannerRPCTimesout { + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final byte[] FAMILY = Bytes.toBytes("testFamily"); + private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier"); + private static final byte[] VALUE = Bytes.toBytes("testValue"); + private static final int SLAVES = 1; + private static final int rpcTimeout = 5 * 1000; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout); + conf.setStrings(HConstants.REGION_SERVER_IMPL, RegionServerWithScanTimesout.class.getName()); + TEST_UTIL.startMiniCluster(SLAVES); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testScannerNextRPCTimesout() throws Exception { + byte[] TABLE = Bytes.toBytes("testScannerNextRPCTimesout"); + HTable ht = TEST_UTIL.createTable(TABLE, FAMILY); + putToTable(ht, "row-1"); + putToTable(ht, "row-2"); + RegionServerWithScanTimesout.seqNoToSleepOn = 1; + Scan scan = new Scan(); + scan.setCaching(1); + ResultScanner scanner = ht.getScanner(scan); + Result result = scanner.next(); + assertNotNull("Expected not null result", result); + result = scanner.next(); + assertNotNull("Expected not null result", result); + scanner.close(); + } + + private void putToTable(HTable ht, String rowkey) throws IOException { + Put put = new Put(rowkey.getBytes()); + put.add(FAMILY, QUALIFIER, VALUE); + ht.put(put); + } + + public static class RegionServerWithScanTimesout extends MiniHBaseClusterRegionServer { + private long tableScannerId; + private boolean slept; + private static long seqNoToSleepOn = -1; + + public RegionServerWithScanTimesout(Configuration conf) throws IOException, + InterruptedException { + super(conf); + } + + @Override + public long openScanner(byte[] regionName, Scan scan) throws IOException { + long scannerId = super.openScanner(regionName, scan); + if (!getRegionInfo(regionName).isMetaTable()) { + tableScannerId = scannerId; + } + return scannerId; + } + + @Override + public Result[] next(long scannerId, int nbRows, long callSeq) throws IOException { + if (!slept && this.tableScannerId == scannerId && seqNoToSleepOn == callSeq) { + try { + Thread.sleep(rpcTimeout + 500); + } catch (InterruptedException e) { + } + slept = true; + } + return super.next(scannerId, nbRows, callSeq); + } + } +} \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/CallSequenceOutOfOrderException.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/CallSequenceOutOfOrderException.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/CallSequenceOutOfOrderException.java (revision 0) @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; + +/** + * Thrown by a region server while scan related next() calls. Both client and server maintain a + * callSequence and if the both do not match, RS will throw this exception. + */ +public class CallSequenceOutOfOrderException extends IOException { + + private static final long serialVersionUID = 1565946556907760065L; + + public CallSequenceOutOfOrderException() { + super(); + } + + public CallSequenceOutOfOrderException(String msg) { + super(msg); + } +}