Index: src/test/org/apache/hadoop/hbase/client/TestForceSplit.java =================================================================== --- src/test/org/apache/hadoop/hbase/client/TestForceSplit.java (revision 798510) +++ src/test/org/apache/hadoop/hbase/client/TestForceSplit.java (working copy) @@ -21,7 +21,10 @@ import java.io.IOException; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HBaseClusterTestCase; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HRegionInfo; @@ -34,6 +37,7 @@ * Tests forced splitting of HTable */ public class TestForceSplit extends HBaseClusterTestCase { + static final Log LOG = LogFactory.getLog(TestForceSplit.class); private static final byte[] tableName = Bytes.toBytes("test"); private static final byte[] columnName = Bytes.toBytes("a:"); @@ -44,7 +48,7 @@ } /** - * the test + * Tests forcing split from client and having scanners successfully ride over split. * @throws Exception * @throws IOException */ @@ -55,7 +59,7 @@ htd.addFamily(new HColumnDescriptor(columnName)); HBaseAdmin admin = new HBaseAdmin(conf); admin.createTable(htd); - HTable table = new HTable(conf, tableName); + final HTable table = new HTable(conf, tableName); byte[] k = new byte[3]; int rowCount = 0; for (byte b1 = 'a'; b1 < 'z'; b1++) { @@ -88,31 +92,50 @@ scanner.close(); assertEquals(rowCount, rows); + // Have an outstanding scan going on to make sure we can scan over splits. + scan = new Scan(); + scanner = table.getScanner(scan); + // Scan first row so we are into first region before split happens. + scanner.next(); + + final AtomicInteger count = new AtomicInteger(0); + Thread t = new Thread("CheckForSplit") { + public void run() { + for (int i = 0; i < 20; i++) { + try { + sleep(1000); + } catch (InterruptedException e) { + continue; + } + // check again table = new HTable(conf, tableName); + Map regions = null; + try { + regions = table.getRegionsInfo(); + } catch (IOException e) { + e.printStackTrace(); + } + if (regions == null) continue; + count.set(regions.size()); + if (count.get() >= 2) break; + LOG.debug("Cycle waiting on split"); + } + } + }; + t.start(); // tell the master to split the table admin.split(Bytes.toString(tableName)); + t.join(); - // give some time for the split to happen - Thread.sleep(15 * 1000); - - // check again table = new HTable(conf, tableName); - m = table.getRegionsInfo(); - System.out.println("Regions after split (" + m.size() + "): " + m); - // should have two regions now - assertTrue(m.size() == 2); - // Verify row count - scan = new Scan(); - scanner = table.getScanner(scan); - rows = 0; - for(Result result : scanner) { + rows = 1; // We counted one row above. + for (Result result : scanner) { rows++; if(rows > rowCount) { scanner.close(); - assertTrue("Have already scanned more rows than expected (" + - rowCount + ")", false); + assertTrue("Scanned more than expected (" + rowCount + ")", false); } } scanner.close(); assertEquals(rowCount, rows); } -} +} \ No newline at end of file Index: src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 798510) +++ src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -803,7 +803,7 @@ */ private Throwable cleanup(final Throwable t, final String msg) { if (msg == null) { - LOG.error(RemoteExceptionHandler.checkThrowable(t)); + LOG.error("", RemoteExceptionHandler.checkThrowable(t)); } else { LOG.error(msg, RemoteExceptionHandler.checkThrowable(t)); } @@ -1890,7 +1890,7 @@ public Result [] next(final long scannerId, int nbRows) throws IOException { try { String scannerName = String.valueOf(scannerId); - InternalScanner s = scanners.get(scannerName); + InternalScanner s = this.scanners.get(scannerName); if (s == null) { throw new UnknownScannerException("Name: " + scannerName); } @@ -1918,6 +1918,9 @@ } return results.toArray(new Result[0]); } catch (Throwable t) { + if (t instanceof NotServingRegionException) { + this.scanners.remove(scannerId); + } throw convertThrowableToIOE(cleanup(t)); } } Index: src/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 798510) +++ src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -110,7 +110,7 @@ * master as a region to close if the carrying regionserver is overloaded. * Once set, it is never cleared. */ - private final AtomicBoolean closing = new AtomicBoolean(false); + final AtomicBoolean closing = new AtomicBoolean(false); private final RegionHistorian historian; ////////////////////////////////////////////////////////////////////////////// @@ -1671,8 +1671,6 @@ return this.basedir; } - - //TODO /** * RegionScanner is an iterator through a bunch of rows in an HRegion. *

@@ -1710,9 +1708,15 @@ * Get the next row of results from this region. * @param results list to append results to * @return true if there are more rows, false if scanner is done + * @throws NotServerRegionException If this region is closing or closed */ public boolean next(List results) throws IOException { + if (closing.get() || closed.get()) { + close(); + throw new NotServingRegionException(regionInfo.getRegionNameAsString() + + " is closing=" + closing.get() + " or closed=" + closed.get()); + } // This method should probably be reorganized a bit... has gotten messy KeyValue kv = this.storeHeap.peek(); if (kv == null) { Index: src/java/org/apache/hadoop/hbase/client/HTable.java =================================================================== --- src/java/org/apache/hadoop/hbase/client/HTable.java (revision 798510) +++ src/java/org/apache/hadoop/hbase/client/HTable.java (working copy) @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.HServerAddress; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; import org.apache.hadoop.hbase.filter.RowFilterInterface; @@ -1778,13 +1779,18 @@ */ protected class ClientScanner implements ResultScanner { private final Log CLIENT_LOG = LogFactory.getLog(this.getClass()); + // HEADSUP: The scan internal start row can change as we move through table. private Scan scan; private boolean closed = false; + // Current region scanner is against. Gets cleared if current region goes + // wonky: e.g. if it splits on us. private HRegionInfo currentRegion = null; private ScannerCallable callable = null; private final LinkedList cache = new LinkedList(); private final int scannerCaching = HTable.this.scannerCaching; private long lastNext; + // Keep lastResult returned successfully in case we have to reset scanner. + private Result lastResult = null; protected ClientScanner(final Scan scan) { if (CLIENT_LOG.isDebugEnabled()) { @@ -1815,9 +1821,11 @@ return lastNext; } - /* - * Gets a scanner for the next region. - * Returns false if there are no more scanners. + /*Œ + * Gets a scanner for the next region. If this.currentRegion != null, then + * we will move to the endrow of this.currentRegion. Else we will get + * scanner at the scan.getStartRow(). + * @param nbRows */ private boolean nextScanner(int nbRows) throws IOException { // Close the previous scanner if it's open @@ -1826,38 +1834,38 @@ getConnection().getRegionServerWithRetries(callable); this.callable = null; } - + + // Where to start the next scanner + byte [] localStartKey = null; + // if we're at the end of the table, then close and return false // to stop iterating - if (currentRegion != null) { + if (this.currentRegion != null) { if (CLIENT_LOG.isDebugEnabled()) { - CLIENT_LOG.debug("Advancing forward from region " + currentRegion); + CLIENT_LOG.debug("Finished with region " + this.currentRegion); } - - byte [] endKey = currentRegion.getEndKey(); + byte [] endKey = this.currentRegion.getEndKey(); if (endKey == null || Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) || filterSaysStop(endKey)) { close(); return false; } - } + localStartKey = endKey; + } else { + localStartKey = this.scan.getStartRow(); + } - HRegionInfo oldRegion = this.currentRegion; - byte [] localStartKey = - oldRegion == null ? scan.getStartRow() : oldRegion.getEndKey(); - if (CLIENT_LOG.isDebugEnabled()) { CLIENT_LOG.debug("Advancing internal scanner to startKey at '" + Bytes.toStringBinary(localStartKey) + "'"); - } - + } try { callable = getScannerCallable(localStartKey, nbRows); - // open a scanner on the region server starting at the + // Open a scanner on the region server starting at the // beginning of the region getConnection().getRegionServerWithRetries(callable); - currentRegion = callable.getHRegionInfo(); + this.currentRegion = callable.getHRegionInfo(); } catch (IOException e) { close(); throw e; @@ -1867,8 +1875,9 @@ protected ScannerCallable getScannerCallable(byte [] localStartKey, int nbRows) { + scan.setStartRow(localStartKey); ScannerCallable s = new ScannerCallable(getConnection(), - getTableName(), localStartKey, scan); + getTableName(), scan); s.setCaching(nbRows); return s; } @@ -1922,6 +1931,12 @@ do { try { values = getConnection().getRegionServerWithRetries(callable); + } catch (NotServingRegionException e) { + // Update start row. + this.scan.setStartRow(this.lastResult.getRow()); + // Clear region as flag to nextScanner to use this.scan.startRow. + this.currentRegion = null; + continue; } catch (IOException e) { if (e instanceof UnknownScannerException && lastNext + scannerTimeout < System.currentTimeMillis()) { @@ -1936,6 +1951,7 @@ for (Result rs : values) { cache.add(rs); countdown--; + this.lastResult = rs; } } } while (countdown > 0 && nextScanner(countdown)); @@ -1965,7 +1981,7 @@ } return resultSets.toArray(new Result[resultSets.size()]); } - + public void close() { if (callable != null) { callable.setClose(); @@ -1998,7 +2014,7 @@ return next != null; } catch (IOException e) { throw new RuntimeException(e); - } + } } return true; } Index: src/java/org/apache/hadoop/hbase/client/MetaScanner.java =================================================================== --- src/java/org/apache/hadoop/hbase/client/MetaScanner.java (revision 798510) +++ src/java/org/apache/hadoop/hbase/client/MetaScanner.java (working copy) @@ -49,7 +49,7 @@ ScannerCallable callable = null; do { Scan scan = new Scan(startRow).addFamily(CATALOG_FAMILY); - callable = new ScannerCallable(connection, META_TABLE_NAME, scan.getStartRow(), scan); + callable = new ScannerCallable(connection, META_TABLE_NAME, scan); // Open scanner connection.getRegionServerWithRetries(callable); try { Index: src/java/org/apache/hadoop/hbase/client/HConnectionManager.java =================================================================== --- src/java/org/apache/hadoop/hbase/client/HConnectionManager.java (revision 798510) +++ src/java/org/apache/hadoop/hbase/client/HConnectionManager.java (working copy) @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; @@ -94,7 +95,6 @@ if (connection == null) { connection = new TableServers(conf); HBASE_INSTANCES.put(conf, connection); - LOG.debug("Created new HBASE_INSTANCES"); } } return connection; @@ -353,8 +353,7 @@ MetaScannerVisitor visitor = new MetaScannerVisitor() { public boolean processRow(Result result) throws IOException { try { - byte[] value = - result.getValue(CATALOG_FAMILY, REGIONINFO_QUALIFIER); + byte[] value = result.getValue(CATALOG_FAMILY, REGIONINFO_QUALIFIER); HRegionInfo info = null; if (value != null) { info = Writables.getHRegionInfo(value); @@ -411,9 +410,7 @@ scan.addColumn(CATALOG_FAMILY, REGIONINFO_QUALIFIER); ScannerCallable s = new ScannerCallable(this, (Bytes.equals(tableName, HConstants.META_TABLE_NAME) ? - HConstants.ROOT_TABLE_NAME : HConstants.META_TABLE_NAME), - scan.getStartRow(), - scan); + HConstants.ROOT_TABLE_NAME : HConstants.META_TABLE_NAME), scan); try { // Open scanner getRegionServerWithRetries(s); @@ -542,7 +539,7 @@ */ private HRegionLocation locateRegionInMeta(final byte [] parentTable, final byte [] tableName, final byte [] row, boolean useCache) - throws IOException{ + throws IOException { HRegionLocation location = null; // If supposed to be using the cache, then check it for a possible hit. // Otherwise, delete any existing cached location so it won't interfere. @@ -554,7 +551,8 @@ } else { deleteCachedLocation(tableName, row); } - + LOG.info("REOMVE parentTable=" + Bytes.toString(parentTable) + ", tableName=" + Bytes.toString(tableName) + ", row=" + Bytes.toString(row)); + // build the key of the meta region we should be looking for. // the extra 9's on the end are necessary to allow "exact" matches // without knowing the precise region names. @@ -579,7 +577,7 @@ if (regionInfoRow == null) { throw new TableNotFoundException(Bytes.toString(tableName)); } - + LOG.info("CLIENT GETCLOSEST " + regionInfoRow); byte [] value = regionInfoRow.getValue(CATALOG_FAMILY, REGIONINFO_QUALIFIER); if (value == null || value.length == 0) { @@ -614,6 +612,7 @@ location = new HRegionLocation(regionInfo, new HServerAddress(serverAddress)); cacheLocation(tableName, location); + LOG.info("CACHING " + location); return location; } catch (TableNotFoundException e) { // if we got this error, probably means the table just plain doesn't @@ -969,7 +968,7 @@ throw (DoNotRetryIOException) t; } } - return null; + return null; } private HRegionLocation Index: src/java/org/apache/hadoop/hbase/client/ScannerCallable.java =================================================================== --- src/java/org/apache/hadoop/hbase/client/ScannerCallable.java (revision 798510) +++ src/java/org/apache/hadoop/hbase/client/ScannerCallable.java (working copy) @@ -1,3 +1,4 @@ + /** * Copyright 2008 The Apache Software Foundation * @@ -23,6 +24,10 @@ import java.io.IOException; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.RemoteExceptionHandler; +import org.apache.hadoop.ipc.RemoteException; +import org.mortbay.log.Log; /** @@ -34,20 +39,16 @@ private boolean instantiated = false; private boolean closed = false; private Scan scan; - private byte [] startRow; private int caching = 1; /** * @param connection * @param tableName - * @param startRow * @param scan */ - public ScannerCallable (HConnection connection, byte [] tableName, - byte [] startRow, Scan scan) { - super(connection, tableName, startRow); + public ScannerCallable (HConnection connection, byte [] tableName, Scan scan) { + super(connection, tableName, scan.getStartRow()); this.scan = scan; - this.startRow = startRow; } /** @@ -67,18 +68,40 @@ */ public Result [] call() throws IOException { if (scannerId != -1L && closed) { - server.close(scannerId); - scannerId = -1L; + close(); } else if (scannerId == -1L && !closed) { - // open the scanner - scannerId = openScanner(); + this.scannerId = openScanner(); } else { - Result [] rrs = server.next(scannerId, caching); + Result [] rrs = null; + try { + rrs = server.next(scannerId, caching); + } catch (IOException e) { + IOException ioe = null; + if (e instanceof RemoteException) { + ioe = RemoteExceptionHandler.decodeRemoteException((RemoteException)e); + } + if (ioe != null && ioe instanceof NotServingRegionException) { + this.scannerId = -1L; + xx + } + } return rrs == null || rrs.length == 0? null: rrs; } return null; } + private void close() { + if (this.scannerId == -1L) { + return; + } + try { + this.server.close(this.scannerId); + } catch (IOException e) { + Log.warn("Ignore, probably already closed", e); + } + this.scannerId = -1L; + } + protected long openScanner() throws IOException { return server.openScanner( this.location.getRegionInfo().getRegionName(), scan);