From f7a15ea084b0068b19b5a00a42d11feb115a67e5 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Wed, 23 Nov 2016 11:46:32 +0800 Subject: [PATCH] HBASE-16984 Implement getScanner --- .../hbase/client/AllowPartialScanResultCache.java | 2 +- .../hadoop/hbase/client/AsyncResultScanner.java | 204 +++++++++++++++++++++ .../AsyncScanSingleRegionRpcRetryingCaller.java | 2 +- .../org/apache/hadoop/hbase/client/AsyncTable.java | 26 +++ .../apache/hadoop/hbase/client/AsyncTableImpl.java | 16 ++ .../hbase/client/CompleteScanResultCache.java | 15 +- .../hbase/client/AbstractTestAsyncTableScan.java | 28 ++- .../hadoop/hbase/client/TestAsyncTableScan.java | 19 +- .../hadoop/hbase/client/TestAsyncTableScanner.java | 86 +++++++++ .../client/TestCompleteResultScanResultCache.java | 28 ++- 10 files changed, 399 insertions(+), 27 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncResultScanner.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java index ab26587..caecfd4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java @@ -41,7 +41,7 @@ class AllowPartialScanResultCache implements ScanResultCache { } private void updateLastCell(Result result) { - lastCell = result.isPartial() ? result.rawCells()[result.rawCells().length - 1] : null; + lastCell = result.rawCells()[result.rawCells().length - 1]; } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncResultScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncResultScanner.java new file mode 100644 index 0000000..a179da2 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncResultScanner.java @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize; +import static org.apache.hadoop.hbase.client.ConnectionUtils.filterCells; + +import com.google.common.base.Throwables; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayDeque; +import java.util.Queue; +import java.util.function.Function; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * The {@link ResultScanner} implementation for {@link AsyncTable}. + *

+ * It will fetch data automatically in background and cache it in memory. If {@link #cacheSize} is + * greater than {@link #maxCacheSize} we will stop the background fetch temporarily to avoid OOM. + * And then if the {@link #cacheSize} is less than half of {@link #maxCacheSize}, we will resume the + * background scan. + *

+ * Typically the {@link #maxCacheSize} will be {@code 2 * scan.getMaxResultSize()}. + */ +@InterfaceAudience.Private +class AsyncResultScanner implements ResultScanner, ScanResultConsumer { + + private static final Log LOG = LogFactory.getLog(AsyncResultScanner.class); + + private final RawAsyncTable rawTable; + + private final Scan scan; + + private final long maxCacheSize; + + private final Queue queue = new ArrayDeque<>(); + + private long cacheSize; + + private boolean closed = false; + + private Throwable error; + + private boolean prefetchStopped; + + private boolean ignoreOnComplete; + + // used to filter out cells that already returned when we restart a scan + private Cell lastCell; + + private Function createClosestRow; + + public AsyncResultScanner(RawAsyncTable table, Scan scan, long maxCacheSize) { + this.rawTable = table; + this.scan = scan; + this.maxCacheSize = maxCacheSize; + this.createClosestRow = scan.isReversed() ? ConnectionUtils::createClosestRowBefore + : ConnectionUtils::createClosestRowAfter; + table.scan(scan, this); + } + + private void addToCache(Result result) { + queue.add(result); + cacheSize += calcEstimatedSize(result); + } + + private void stopPrefetch(Result lastResult) { + prefetchStopped = true; + if (lastResult.isPartial() || scan.getBatch() > 0) { + scan.setStartRow(lastResult.getRow()); + lastCell = lastResult.rawCells()[lastResult.rawCells().length - 1]; + } else { + scan.setStartRow(createClosestRow.apply(lastResult.getRow())); + } + if (LOG.isDebugEnabled()) { + LOG.debug(System.identityHashCode(this) + " stop prefetching when scanning " + + rawTable.getName() + " as the cache size " + cacheSize + + " is greater than the maxCacheSize + " + maxCacheSize + ", the next start row is " + + Bytes.toStringBinary(scan.getStartRow()) + ", lastCell is " + lastCell); + } + // ignore the first onComplete call as the scan is stopped by us. + ignoreOnComplete = true; + } + + @Override + public synchronized boolean onNext(Result[] results) { + assert results.length > 0; + if (closed) { + return false; + } + Result firstResult = results[0]; + if (lastCell != null) { + firstResult = filterCells(firstResult, lastCell); + if (firstResult != null) { + // do not set lastCell to null if the result after filtering is null as we may still need to + lastCell = null; + addToCache(firstResult); + } else if (results.length == 1) { + // the only one result is null + return true; + } + } else { + addToCache(firstResult); + } + for (int i = 1; i < results.length; i++) { + addToCache(results[i]); + } + notifyAll(); + if (cacheSize < maxCacheSize) { + return true; + } + stopPrefetch(results[results.length - 1]); + return false; + } + + @Override + public synchronized boolean onHeartbeat() { + return !closed; + } + + @Override + public synchronized void onError(Throwable error) { + this.error = error; + } + + @Override + public synchronized void onComplete() { + if (ignoreOnComplete) { + // The scan is stopped by us due to cache size limit, do not set closed as we may resume the + // scan later. + ignoreOnComplete = false; + return; + } + closed = true; + notifyAll(); + } + + private void resumePrefetch() { + if (LOG.isDebugEnabled()) { + LOG.debug(System.identityHashCode(this) + " resume prefetching"); + } + prefetchStopped = false; + rawTable.scan(scan, this); + } + + @Override + public synchronized Result next() throws IOException { + while (queue.isEmpty()) { + if (closed) { + return null; + } + if (error != null) { + Throwables.propagateIfPossible(error, IOException.class); + throw new IOException(error); + } + try { + wait(); + } catch (InterruptedException e) { + throw new InterruptedIOException(); + } + } + Result result = queue.poll(); + cacheSize -= calcEstimatedSize(result); + if (prefetchStopped && cacheSize <= maxCacheSize / 2) { + resumePrefetch(); + } + return result; + } + + @Override + public synchronized void close() { + closed = true; + queue.clear(); + cacheSize = 0; + notifyAll(); + } + + @Override + public boolean renewLease() { + return false; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java index 0efac7f..bb31ac4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java @@ -246,7 +246,7 @@ class AsyncScanSingleRegionRpcRetryingCaller { private void updateNextStartRowWhenError(Result result) { nextStartRowWhenError = result.getRow(); - includeNextStartRowWhenError = result.isPartial(); + includeNextStartRowWhenError = scan.getBatch() > 0 || result.isPartial(); } private void completeWhenNoMoreResultsInRegion() { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java index e082d10..9e7472c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java @@ -30,4 +30,30 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; @InterfaceAudience.Public @InterfaceStability.Unstable public interface AsyncTable extends AsyncTableBase { + + /** + * Gets a scanner on the current table for the given family. + * @param family The column family to scan. + * @return A scanner. + */ + default ResultScanner getScanner(byte[] family) { + return getScanner(new Scan().addFamily(family)); + } + + /** + * Gets a scanner on the current table for the given family and qualifier. + * @param family The column family to scan. + * @param qualifier The column qualifier to scan. + * @return A scanner. + */ + default ResultScanner getScanner(byte[] family, byte[] qualifier) { + return getScanner(new Scan().addColumn(family, qualifier)); + } + + /** + * Returns a scanner on the current table as specified by the {@link Scan} object. + * @param scan A configured {@link Scan} object. + * @return A scanner. + */ + ResultScanner getScanner(Scan scan); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java index 6e1233d..89faf73 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java @@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.util.ReflectionUtils; /** * The implementation of AsyncTable. Based on {@link RawAsyncTable}. @@ -37,9 +38,12 @@ class AsyncTableImpl implements AsyncTable { private final ExecutorService pool; + private final long defaultScannerMaxResultSize; + public AsyncTableImpl(AsyncConnectionImpl conn, TableName tableName, ExecutorService pool) { this.rawTable = conn.getRawTable(tableName); this.pool = pool; + this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize(); } @Override @@ -156,4 +160,16 @@ class AsyncTableImpl implements AsyncTable { public CompletableFuture> smallScan(Scan scan, int limit) { return wrap(rawTable.smallScan(scan, limit)); } + + private long resultSize2CacheSize(long maxResultSize) { + // * 2 if possible + return maxResultSize > Long.MAX_VALUE / 2 ? maxResultSize : maxResultSize * 2; + } + + @Override + public ResultScanner getScanner(Scan scan) { + return new AsyncResultScanner(rawTable, ReflectionUtils.newInstance(scan.getClass(), scan), + resultSize2CacheSize( + scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : defaultScannerMaxResultSize)); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteScanResultCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteScanResultCache.java index 538aecb..9dfb8f7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteScanResultCache.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteScanResultCache.java @@ -40,9 +40,22 @@ class CompleteScanResultCache implements ScanResultCache { } private Result[] prependCombined(Result[] results, int length) throws IOException { + if (length == 0) { + return new Result[] { combine() }; + } + // the last part of a partial result may not be marked as partial so here we need to check if + // there is a row change. + int start; + if (Bytes.equals(partialResults.get(0).getRow(), results[0].getRow())) { + partialResults.add(results[0]); + start = 1; + length--; + } else { + start = 0; + } Result[] prependResults = new Result[length + 1]; prependResults[0] = combine(); - System.arraycopy(results, 0, prependResults, 1, length); + System.arraycopy(results, start, prependResults, 1, length); return prependResults; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java index 220be10..3028111 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java @@ -18,10 +18,15 @@ package org.apache.hadoop.hbase.client; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.hadoop.hbase.HBaseTestingUtility; @@ -59,10 +64,9 @@ public abstract class AbstractTestAsyncTableScan { ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()); RawAsyncTable table = ASYNC_CONN.getRawTable(TABLE_NAME); List> futures = new ArrayList<>(); - IntStream.range(0, COUNT) - .forEach(i -> futures.add(table.put( - new Put(Bytes.toBytes(String.format("%03d", i))).addColumn(FAMILY, CQ1, Bytes.toBytes(i)) - .addColumn(FAMILY, CQ2, Bytes.toBytes(i * i))))); + IntStream.range(0, COUNT).forEach( + i -> futures.add(table.put(new Put(Bytes.toBytes(String.format("%03d", i))) + .addColumn(FAMILY, CQ1, Bytes.toBytes(i)).addColumn(FAMILY, CQ2, Bytes.toBytes(i * i))))); CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(); } @@ -76,6 +80,22 @@ public abstract class AbstractTestAsyncTableScan { protected abstract List doScan(Scan scan) throws Exception; + private Result convertToPartial(Result result) { + return Result.create(result.rawCells(), result.getExists(), result.isStale(), true); + } + + protected final List convertFromBatchResult(List results) { + assertTrue(results.size() % 2 == 0); + return IntStream.range(0, results.size() / 2).mapToObj(i -> { + try { + return Result.createCompleteResult(Arrays.asList(convertToPartial(results.get(2 * i)), + convertToPartial(results.get(2 * i + 1)))); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }).collect(Collectors.toList()); + } + @Test public void testScanAll() throws Exception { List results = doScan(createScan()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java index 65fb1ad..dc593d4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java @@ -17,20 +17,15 @@ */ package org.apache.hadoop.hbase.client; -import static org.junit.Assert.assertTrue; - import com.google.common.base.Throwables; import java.io.IOException; -import java.io.UncheckedIOException; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Queue; import java.util.function.Supplier; -import java.util.stream.Collectors; -import java.util.stream.IntStream; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -119,10 +114,6 @@ public class TestAsyncTableScan extends AbstractTestAsyncTableScan { return scanCreater.get(); } - private Result convertToPartial(Result result) { - return Result.create(result.rawCells(), result.getExists(), result.isStale(), true); - } - @Override protected List doScan(Scan scan) throws Exception { SimpleScanResultConsumer scanConsumer = new SimpleScanResultConsumer(); @@ -132,15 +123,7 @@ public class TestAsyncTableScan extends AbstractTestAsyncTableScan { results.add(result); } if (scan.getBatch() > 0) { - assertTrue(results.size() % 2 == 0); - return IntStream.range(0, results.size() / 2).mapToObj(i -> { - try { - return Result.createCompleteResult(Arrays.asList(convertToPartial(results.get(2 * i)), - convertToPartial(results.get(2 * i + 1)))); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - }).collect(Collectors.toList()); + results = convertFromBatchResult(results); } return results; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java new file mode 100644 index 0000000..006770d --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ForkJoinPool; +import java.util.function.Supplier; + +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +@RunWith(Parameterized.class) +@Category({ LargeTests.class, ClientTests.class }) +public class TestAsyncTableScanner extends AbstractTestAsyncTableScan { + + @Parameter + public Supplier scanCreater; + + @Parameters + public static List params() { + return Arrays.asList(new Supplier[] { TestAsyncTableScanner::createNormalScan }, + new Supplier[] { TestAsyncTableScanner::createBatchScan }, + new Supplier[] { TestAsyncTableScanner::createSmallResultSizeScan }, + new Supplier[] { TestAsyncTableScanner::createBatchSmallResultSizeScan }); + } + + private static Scan createNormalScan() { + return new Scan(); + } + + private static Scan createBatchScan() { + return new Scan().setBatch(1); + } + + // set a small result size for testing flow control + private static Scan createSmallResultSizeScan() { + return new Scan().setMaxResultSize(1); + } + + private static Scan createBatchSmallResultSizeScan() { + return new Scan().setBatch(1).setMaxResultSize(1); + } + + @Override + protected Scan createScan() { + return scanCreater.get(); + } + + @Override + protected List doScan(Scan scan) throws Exception { + AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()); + List results = new ArrayList<>(); + try (ResultScanner scanner = table.getScanner(scan)) { + for (Result result; (result = scanner.next()) != null;) { + results.add(result); + } + } + if (scan.getBatch() > 0) { + results = convertFromBatchResult(results); + } + return results; + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCompleteResultScanResultCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCompleteResultScanResultCache.java index d688d0a..a340e9f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCompleteResultScanResultCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCompleteResultScanResultCache.java @@ -139,8 +139,8 @@ public class TestCompleteResultScanResultCache { assertEquals(0, resultCache.addAndGet(new Result[] { result1 }, false).length); assertEquals(0, resultCache.addAndGet(new Result[] { result2 }, false).length); - Result[] results = resultCache.addAndGet(new Result[] { nextResult1, nextToNextResult1 }, - false); + Result[] results = + resultCache.addAndGet(new Result[] { nextResult1, nextToNextResult1 }, false); assertEquals(2, results.length); assertEquals(1, Bytes.toInt(results[0].getRow())); assertEquals(2, results[0].rawCells().length); @@ -156,4 +156,28 @@ public class TestCompleteResultScanResultCache { assertEquals(1, results[0].rawCells().length); assertEquals(3, Bytes.toInt(results[0].getValue(CF, CQ1))); } + + @Test + public void testCombine4() throws IOException { + Result result1 = Result.create(Arrays.asList(createCell(1, CQ1)), null, false, true); + Result result2 = Result.create(Arrays.asList(createCell(1, CQ2)), null, false, false); + Result nextResult1 = Result.create(Arrays.asList(createCell(2, CQ1)), null, false, true); + Result nextResult2 = Result.create(Arrays.asList(createCell(2, CQ2)), null, false, false); + + assertEquals(0, resultCache.addAndGet(new Result[] { result1 }, false).length); + + Result[] results = resultCache.addAndGet(new Result[] { result2, nextResult1 }, false); + assertEquals(1, results.length); + assertEquals(1, Bytes.toInt(results[0].getRow())); + assertEquals(2, results[0].rawCells().length); + assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ1))); + assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ2))); + + results = resultCache.addAndGet(new Result[] { nextResult2 }, false); + assertEquals(1, results.length); + assertEquals(2, Bytes.toInt(results[0].getRow())); + assertEquals(2, results[0].rawCells().length); + assertEquals(2, Bytes.toInt(results[0].getValue(CF, CQ1))); + assertEquals(2, Bytes.toInt(results[0].getValue(CF, CQ2))); + } } -- 2.7.4