### Eclipse Workspace Patch 1.0 #P apache-0.94 Index: src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java (revision 1521714) +++ src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java (working copy) @@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.ipc.HBaseRPC; @@ -259,4 +260,14 @@ } return t; } + + /** + * @return the HRegionInfo for the current region + */ + public HRegionInfo getHRegionInfo() { + if (this.location == null) { + return null; + } + return this.location.getRegionInfo(); + } } Index: src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (revision 1521714) +++ src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (working copy) @@ -73,6 +73,9 @@ // if heap == null and lastTop != null, you need to reseek given the key below private KeyValue lastTop = null; + // A flag whether use pread for scan + private boolean scanUsePread = false; + /** An internal constructor. */ private StoreScanner(Store store, boolean cacheBlocks, Scan scan, final NavigableSet columns, long ttl, int minVersions) { @@ -91,6 +94,7 @@ // for multi-row (non-"get") scans because this is not done in // StoreFile.passesBloomFilter(Scan, SortedSet). useRowColBloom = numCol > 1 || (!isGet && numCol == 1); + this.scanUsePread = scan.isSmall(); } /** @@ -218,7 +222,8 @@ */ private List getScannersNoCompaction() throws IOException { final boolean isCompaction = false; - return selectScannersFrom(store.getScanners(cacheBlocks, isGet, + boolean usePread = isGet || scanUsePread; + return selectScannersFrom(store.getScanners(cacheBlocks, usePread, isCompaction, matcher)); } Index: src/main/java/org/apache/hadoop/hbase/regionserver/Store.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (revision 1521714) +++ src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (working copy) @@ -1059,7 +1059,7 @@ * @return all scanners for this store */ protected List getScanners(boolean cacheBlocks, - boolean isGet, + boolean usePread, boolean isCompaction, ScanQueryMatcher matcher) throws IOException { List storeFiles; @@ -1078,7 +1078,7 @@ // but now we get them in ascending order, which I think is // actually more correct, since memstore get put at the end. List sfScanners = StoreFileScanner - .getScannersForStoreFiles(storeFiles, cacheBlocks, isGet, isCompaction, matcher); + .getScannersForStoreFiles(storeFiles, cacheBlocks, usePread, isCompaction, matcher); List scanners = new ArrayList(sfScanners.size()+1); scanners.addAll(sfScanners); Index: src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (revision 1521714) +++ src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (working copy) @@ -647,4 +647,17 @@ @Override public void stop(String why); + + /** + * Perform scan operation. + * @param regionName name of region to get from + * @param Scan scan operation + * @param numberOfRows the maximum number of rows to fetch + * @return Array of Results;array is empty if done with this region and null + * if we are NOT to go to the next region (happens when a filter rules + * that the scan is done). + * @throws IOException e + */ + public Result[] scan(byte[] regionName, Scan scan, int numberOfRows) + throws IOException; } Index: src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java (revision 1521714) +++ src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java (working copy) @@ -42,19 +42,19 @@ */ public class ClientScanner extends AbstractClientScanner { private final Log LOG = LogFactory.getLog(this.getClass()); - private Scan scan; - private boolean closed = false; + protected Scan scan; + protected boolean closed = false; // Current region scanner is against. Gets cleared if current region goes // wonky: e.g. if it splits on us. - private HRegionInfo currentRegion = null; + protected HRegionInfo currentRegion = null; private ScannerCallable callable = null; - private final LinkedList cache = new LinkedList(); - private final int caching; - private long lastNext; + protected final LinkedList cache = new LinkedList(); + protected final int caching; + protected long lastNext; // Keep lastResult returned successfully in case we have to reset scanner. - private Result lastResult = null; - private ScanMetrics scanMetrics = null; - private final long maxScannerResultSize; + protected Result lastResult = null; + protected ScanMetrics scanMetrics = null; + protected final long maxScannerResultSize; private final HConnection connection; private final byte[] tableName; private final int scannerTimeout; @@ -117,6 +117,11 @@ } // initialize the scanner + initializeScannerInConstruction(); + } + + protected void initializeScannerInConstruction() throws IOException{ + // initialize the scanner nextScanner(this.caching, false); } @@ -137,7 +142,7 @@ } // returns true if the passed region endKey - private boolean checkScanStopRow(final byte [] endKey) { + protected boolean checkScanStopRow(final byte [] endKey) { if (this.scan.getStopRow().length > 0) { // there is a stop row, check to see if we are past it. byte [] stopRow = scan.getStopRow(); @@ -234,7 +239,7 @@ * * scan.setAttribute(SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE)) */ - private void writeScanMetrics() throws IOException { + protected void writeScanMetrics() throws IOException { if (this.scanMetrics == null) { return; } Index: src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (revision 1521714) +++ src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (working copy) @@ -4940,6 +4940,43 @@ TEST_UTIL.deleteTable(TABLE); } + @Test + public void testSmallScan() throws Exception { + // Test Initialization. + byte[] TABLE = Bytes.toBytes("testSmallScan"); + HTable table = TEST_UTIL.createTable(TABLE, FAMILY); + + // Insert one row each region + int insertNum = 10; + for (int i = 0; i < 10; i++) { + Put put = new Put(Bytes.toBytes("row" + String.format("%03d", i))); + put.add(FAMILY, QUALIFIER, VALUE); + table.put(put); + } + + // nomal scan + ResultScanner scanner = table.getScanner(new Scan()); + int count = 0; + for (Result r : scanner) { + assertTrue(!r.isEmpty()); + count++; + } + assertEquals(insertNum, count); + + // small scan + Scan scan = new Scan(HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); + scan.setSmall(true); + scan.setCaching(2); + scanner = table.getScanner(scan); + count = 0; + for (Result r : scanner) { + assertTrue(!r.isEmpty()); + count++; + } + assertEquals(insertNum, count); + + } + @org.junit.Rule public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1521714) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -2523,8 +2523,15 @@ // // remote scanner interface // + + public long openScanner(byte[] regionName, Scan scan) throws IOException { + RegionScanner s = internalOpenScanner(regionName, scan); + long scannerId = addScanner(s); + return scannerId; + } - public long openScanner(byte[] regionName, Scan scan) throws IOException { + private RegionScanner internalOpenScanner(byte[] regionName, Scan scan) + throws IOException { checkOpen(); NullPointerException npe = null; if (regionName == null) { @@ -2559,7 +2566,7 @@ s = savedScanner; } } - return addScanner(s); + return s; } catch (Throwable t) { throw convertThrowableToIOE(cleanup(t, "Failed openScanner")); } @@ -2589,16 +2596,23 @@ LOG.info("Client tried to access missing scanner " + scannerName); throw new UnknownScannerException("Name: " + scannerName); } + return internalNext(s, nbRows, scannerName); + } + + private Result[] internalNext(final RegionScanner s, int nbRows, + String scannerName) throws IOException { try { checkOpen(); } catch (IOException e) { // If checkOpen failed, server not running or filesystem gone, // cancel this lease; filesystem is gone or we're closing or something. - try { - this.leases.cancelLease(scannerName); - } catch (LeaseException le) { - LOG.info("Server shutting down and client tried to access missing scanner " + - scannerName); + if (scannerName != null) { + try { + this.leases.cancelLease(scannerName); + } catch (LeaseException le) { + LOG.info("Server shutting down and client tried to access missing scanner " + + scannerName); + } } throw e; } @@ -2607,7 +2621,9 @@ // Remove lease while its being processed in server; protects against case // where processing of request takes > lease expiration time. try { - lease = this.leases.removeLease(scannerName); + if (scannerName != null) { + lease = this.leases.removeLease(scannerName); + } } catch (LeaseException le) { // What it really means is that there's no such scanner. LOG.info("Client tried to access missing scanner " + scannerName + " (no lease)"); @@ -2677,25 +2693,30 @@ return s.isFilterDone() && results.isEmpty() ? null : results.toArray(new Result[0]); } catch (Throwable t) { - if (t instanceof NotServingRegionException) { + if (t instanceof NotServingRegionException && scannerName != null) { this.scanners.remove(scannerName); } throw convertThrowableToIOE(cleanup(t)); } finally { // We're done. On way out readd the above removed lease. Adding resets // expiration time on lease. - if (this.scanners.containsKey(scannerName)) { + if (scannerName != null && this.scanners.containsKey(scannerName)) { if (lease != null) this.leases.addLease(lease); } } } public void close(final long scannerId) throws IOException { + String scannerName = String.valueOf(scannerId); + RegionScanner s = scanners.get(scannerName); + internalCloseScanner(s, scannerName); + } + + private void internalCloseScanner(final RegionScanner s, String scannerName) + throws IOException { try { checkOpen(); requestCount.incrementAndGet(); - String scannerName = String.valueOf(scannerId); - RegionScanner s = scanners.get(scannerName); HRegion region = null; if (s != null) { @@ -2707,14 +2728,18 @@ } } } + RegionScanner toCloseScanner = s; + if (scannerName != null) { + toCloseScanner = scanners.remove(scannerName); + } + if (toCloseScanner != null) { + toCloseScanner.close(); + if (scannerName != null) { + this.leases.cancelLease(scannerName); + } - s = scanners.remove(scannerName); - if (s != null) { - s.close(); - this.leases.cancelLease(scannerName); - if (region != null && region.getCoprocessorHost() != null) { - region.getCoprocessorHost().postScannerClose(s); + region.getCoprocessorHost().postScannerClose(toCloseScanner); } } } catch (Throwable t) { @@ -2722,6 +2747,18 @@ } } + @Override + public Result[] scan(byte[] regionName, Scan scan, int numberOfRows) + throws IOException { + RegionScanner s = internalOpenScanner(regionName, scan); + try { + Result[] results = internalNext(s, numberOfRows, null); + return results; + } finally { + internalCloseScanner(s, null); + } + } + /** * Instantiated as a scanner lease. If the lease times out, the scanner is * closed Index: src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java (revision 0) @@ -0,0 +1,210 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.util.Bytes; + +import com.google.protobuf.ServiceException; + +/** + * Client scanner for small scan. Generally, only one RPC is called to fetch the + * scan results, unless the results cross multiple regions or the row count of + * results excess the caching. + * + * For small scan, it will get better performance than {@link ClientScanner} + */ +public class ClientSmallScanner extends ClientScanner { + private final Log LOG = LogFactory.getLog(this.getClass()); + private ServerCallable smallScanCallable = null; + // When fetching results from server, skip the first result if it has the same + // row with this one + private byte[] skipRowOfFirstResult = null; + + /** + * Create a new ClientSmallScanner for the specified table. An HConnection + * will be retrieved using the passed Configuration. Note that the passed + * {@link Scan} 's start row maybe changed. + * + * @param conf The {@link Configuration} to use. + * @param scan {@link Scan} to use in this scanner + * @param tableName The table that we wish to rangeGet + * @throws IOException + */ + public ClientSmallScanner(final Configuration conf, final Scan scan, + final byte[] tableName) throws IOException { + this(conf, scan, tableName, HConnectionManager.getConnection(conf)); + } + + /** + * Create a new ClientSmallScanner for the specified table. An HConnection + * will be retrieved using the passed Configuration. Note that the passed + * {@link Scan} 's start row maybe changed. + * @param conf + * @param scan + * @param tableName + * @param connection + * @throws IOException + */ + public ClientSmallScanner(final Configuration conf, final Scan scan, + final byte[] tableName, HConnection connection) throws IOException { + super(conf, scan, tableName, connection); + } + + @Override + protected void initializeScannerInConstruction() throws IOException { + // No need to initialize the scanner when constructing instance, do it when + // calling next(). Do nothing here. + } + + /** + * Gets a scanner for following scan. Move to next region or continue from the + * last result or start from the start row. + * @param nbRows + * @param done true if Server-side says we're done scanning. + * @param currentRegionDone true if scan is over on current region + * @return true if has next scanner + * @throws IOException + */ + private boolean nextScanner(int nbRows, final boolean done, + boolean currentRegionDone) throws IOException { + // Where to start the next getter + byte[] localStartKey; + int cacheNum = nbRows; + skipRowOfFirstResult = null; + // if we're at end of table, close and return false to stop iterating + if (this.currentRegion != null && currentRegionDone) { + byte[] endKey = this.currentRegion.getEndKey(); + if (endKey == null || Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) + || checkScanStopRow(endKey) || done) { + close(); + if (LOG.isDebugEnabled()) { + LOG.debug("Finished with small scan at " + this.currentRegion); + } + return false; + } + localStartKey = endKey; + if (LOG.isDebugEnabled()) { + LOG.debug("Finished with region " + this.currentRegion); + } + } else if (this.lastResult != null) { + localStartKey = this.lastResult.getRow(); + skipRowOfFirstResult = this.lastResult.getRow(); + cacheNum++; + } else { + localStartKey = this.scan.getStartRow(); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Advancing internal small scanner to startKey at '" + + Bytes.toStringBinary(localStartKey) + "'"); + } + smallScanCallable = getSmallScanCallable(localStartKey, cacheNum); + if (this.scanMetrics != null && skipRowOfFirstResult == null) { + this.scanMetrics.countOfRegions.inc(); + } + return true; + } + + private ServerCallable getSmallScanCallable(byte[] localStartKey, + final int nbRows) { + this.scan.setStartRow(localStartKey); + ServerCallable callable = new ServerCallable( + getConnection(), getTableName(), scan.getStartRow()) { + public Result[] call() throws IOException { + return server.scan(location.getRegionInfo().getRegionName(), scan, + nbRows); + } + }; + return callable; + } + + @Override + public Result next() throws IOException { + // If the scanner is closed and there's nothing left in the cache, next is a + // no-op. + if (cache.size() == 0 && this.closed) { + return null; + } + if (cache.size() == 0) { + Result[] values = null; + long remainingResultSize = maxScannerResultSize; + int countdown = this.caching; + boolean currentRegionDone = false; + // Values == null means server-side filter has determined we must STOP + while (remainingResultSize > 0 && countdown > 0 + && nextScanner(countdown, values == null, currentRegionDone)) { + // Server returns a null values if scanning is to stop. Else, + // returns an empty array if scanning is to go on and we've just + // exhausted current region. + values = smallScanCallable.withRetries(); + this.currentRegion = smallScanCallable.getHRegionInfo(); + long currentTime = System.currentTimeMillis(); + if (this.scanMetrics != null) { + this.scanMetrics.sumOfMillisSecBetweenNexts.inc(currentTime + - lastNext); + } + lastNext = currentTime; + if (values != null && values.length > 0) { + for (int i = 0; i < values.length; i++) { + Result rs = values[i]; + if (i == 0 && this.skipRowOfFirstResult != null + && Bytes.equals(skipRowOfFirstResult, rs.getRow())) { + // Skip the first result + continue; + } + cache.add(rs); + for (KeyValue kv : rs.raw()) { + remainingResultSize -= kv.heapSize(); + } + countdown--; + this.lastResult = rs; + } + } + currentRegionDone = countdown > 0; + } + } + + if (cache.size() > 0) { + return cache.poll(); + } + // if we exhausted this scanner before calling close, write out the scan + // metrics + writeScanMetrics(); + return null; + } + + @Override + public void close() { + closed = true; + try { + writeScanMetrics(); + } catch (IOException e) { + // As ClientScanner#close, we don't want the scanner close() method to + // throw. + } + } +} \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/client/HTable.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HTable.java (revision 1521714) +++ src/main/java/org/apache/hadoop/hbase/client/HTable.java (working copy) @@ -671,6 +671,10 @@ if (scan.getCaching() <= 0) { scan.setCaching(getScannerCaching()); } + if (scan.isSmall()) { + return new ClientSmallScanner(getConfiguration(), scan, getTableName(), + this.connection); + } return new ClientScanner(getConfiguration(), scan, getTableName(), this.connection); } Index: src/main/java/org/apache/hadoop/hbase/client/Scan.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/Scan.java (revision 1521714) +++ src/main/java/org/apache/hadoop/hbase/client/Scan.java (working copy) @@ -83,6 +83,7 @@ private static final String RAW_ATTR = "_raw_"; private static final String ONDEMAND_ATTR = "_ondemand_"; private static final String ISOLATION_LEVEL = "_isolationlevel_"; + private static final String SMALL_ATTR = "_small_"; private static final byte SCAN_VERSION = (byte)2; private byte [] startRow = HConstants.EMPTY_START_ROW; @@ -491,6 +492,39 @@ } /** + * Set whether this scan is a small scan + *

+ * Small scan should use pread and big scan can use seek + read + * + * seek + read is fast but can cause two problem (1) resource contention (2) + * cause too much network io + * + * [89-fb] Using pread for non-compaction read request + * https://issues.apache.org/jira/browse/HBASE-7266 + * + * On the other hand, if setting it true, we would do + * openScanner,next,closeScanner in one RPC call. It means the better + * performance for small scan. [HBASE-9488]. + * + * Generally, if the scan range is within one data block(64KB), it could be + * considered as a small scan. + * + * @param small + */ + public void setSmall(boolean small) { + setAttribute(SMALL_ATTR, Bytes.toBytes(small)); + } + + /** + * Get whether this scan is a small scan + * @return true if small scan + */ + public boolean isSmall() { + byte[] attr = getAttribute(SMALL_ATTR); + return attr == null ? false : Bytes.toBoolean(attr); + } + + /** * Compile the table and column family (i.e. schema) information * into a String. Useful for parsing and aggregation by debugging, * logging, and administration tools.