From cf7775dee005563b07f6455287a7737dc4f39238 Mon Sep 17 00:00:00 2001 From: Vikas Vishwakarma Date: Mon, 23 Jul 2018 15:44:45 +0530 Subject: [PATCH] HBASE-20896 Port HBASE-20866 to branch-1 and branch-1.4 --- .../hbase/client/AllowPartialScanResultCache.java | 41 +++++----- .../hadoop/hbase/client/BatchScanResultCache.java | 45 ++++++----- .../apache/hadoop/hbase/client/ClientScanner.java | 22 ++---- .../hbase/client/CompleteScanResultCache.java | 60 +++++++++------ .../hadoop/hbase/client/ConnectionUtils.java | 9 ++- .../hadoop/hbase/client/ScanResultCache.java | 56 +++++++++++++- .../client/TestAllowPartialScanResultCache.java | 26 ++++--- .../hbase/client/TestBatchScanResultCache.java | 31 +++++--- .../client/TestCompleteResultScanResultCache.java | 88 ++++++++++++++++------ 9 files changed, 250 insertions(+), 128 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java index 5b6c411..0d7c414 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java @@ -20,7 +20,7 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.client.ConnectionUtils.filterCells; import java.io.IOException; -import java.util.Arrays; +import java.util.LinkedList; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; @@ -33,7 +33,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; * that skips the cells that have already been returned. */ @InterfaceAudience.Private -class AllowPartialScanResultCache implements ScanResultCache { +class AllowPartialScanResultCache extends ScanResultCache { // used to filter out the cells that already returned to user as we always start from the // beginning of a row when retry. @@ -41,7 +41,9 @@ class AllowPartialScanResultCache implements ScanResultCache { private boolean lastResultPartial; - private int numberOfCompleteRows; + public AllowPartialScanResultCache(LinkedList cache) { + this.cache = cache; + } private void recordLastResult(Result result) { lastCell = result.rawCells()[result.rawCells().length - 1]; @@ -49,14 +51,14 @@ class AllowPartialScanResultCache implements ScanResultCache { } @Override - public Result[] addAndGet(Result[] results, boolean isHeartbeatMessage) throws IOException { + public void loadResultsToCache(Result[] results, boolean isHeartbeatMessage) throws IOException { if (results.length == 0) { if (!isHeartbeatMessage && lastResultPartial) { // An empty non heartbeat result indicate that there must be a row change. So if the // lastResultPartial is true then we need to increase numberOfCompleteRows. numberOfCompleteRows++; } - return EMPTY_RESULT_ARRAY; + return; } int i; for (i = 0; i < results.length; i++) { @@ -67,31 +69,34 @@ class AllowPartialScanResultCache implements ScanResultCache { } } if (i == results.length) { - return EMPTY_RESULT_ARRAY; + return; } if (lastResultPartial && !CellUtil.matchingRow(lastCell, results[0].getRow())) { // there is a row change, so increase numberOfCompleteRows numberOfCompleteRows++; } recordLastResult(results[results.length - 1]); - if (i > 0) { - results = Arrays.copyOfRange(results, i, results.length); + addResultArrayToCache(results, i, results.length); + } + + @Override + protected void addResultToCache(Result rs) { + if (!rs.mayHaveMoreCellsInRow()) { + numberOfCompleteRows++; } - for (Result result : results) { - if (!result.mayHaveMoreCellsInRow()) { - numberOfCompleteRows++; - } + cache.add(rs); + for (Cell cell : rs.rawCells()) { + resultSize += CellUtil.estimatedHeapSizeOf(cell); } - return results; + count++; + lastResult = rs; } @Override public void clear() { // we do not cache anything - } - - @Override - public int numberOfCompleteRows() { - return numberOfCompleteRows; + count = 0; + resultSize = 0; + lastResult = null; } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BatchScanResultCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BatchScanResultCache.java index 293f411..5ad90c9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BatchScanResultCache.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BatchScanResultCache.java @@ -21,9 +21,8 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.filterCells; import java.io.IOException; import java.util.ArrayDeque; -import java.util.ArrayList; import java.util.Deque; -import java.util.List; +import java.util.LinkedList; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; @@ -38,7 +37,7 @@ import org.apache.hadoop.hbase.util.Bytes; * doesn't mean setAllowPartialResult(true). */ @InterfaceAudience.Private -public class BatchScanResultCache implements ScanResultCache { +public class BatchScanResultCache extends ScanResultCache { private final int batch; @@ -52,9 +51,8 @@ public class BatchScanResultCache implements ScanResultCache { private int numCellsOfPartialResults; - private int numberOfCompleteRows; - - public BatchScanResultCache(int batch) { + public BatchScanResultCache(LinkedList cache, int batch) { + this.cache = cache; this.batch = batch; } @@ -109,11 +107,12 @@ public class BatchScanResultCache implements ScanResultCache { } @Override - public Result[] addAndGet(Result[] results, boolean isHeartbeatMessage) throws IOException { + public void loadResultsToCache(Result[] results, boolean isHeartbeatMessage) throws IOException { if (results.length == 0) { if (!isHeartbeatMessage) { if (!partialResults.isEmpty()) { - return new Result[] { createCompletedResult() }; + addResultToCache(createCompletedResult()); + return; } if (lastResultPartial) { // An empty non heartbeat result indicate that there must be a row change. So if the @@ -121,9 +120,8 @@ public class BatchScanResultCache implements ScanResultCache { numberOfCompleteRows++; } } - return EMPTY_RESULT_ARRAY; + return; } - List regroupedResults = new ArrayList<>(); for (Result result : results) { result = filterCells(result, lastCell); if (result == null) { @@ -132,7 +130,7 @@ public class BatchScanResultCache implements ScanResultCache { if (!partialResults.isEmpty()) { if (!Bytes.equals(partialResults.peek().getRow(), result.getRow())) { // there is a row change - regroupedResults.add(createCompletedResult()); + addResultToCache(createCompletedResult()); } } else if (lastResultPartial && !CellUtil.matchingRow(lastCell, result.getRow())) { // As for batched scan we may return partial results to user if we reach the batch limit, so @@ -143,33 +141,40 @@ public class BatchScanResultCache implements ScanResultCache { // check if we have a row change if (!partialResults.isEmpty() && !Bytes.equals(partialResults.peek().getRow(), result.getRow())) { - regroupedResults.add(createCompletedResult()); + addResultToCache(createCompletedResult()); } Result regroupedResult = regroupResults(result); if (regroupedResult != null) { if (!regroupedResult.mayHaveMoreCellsInRow()) { numberOfCompleteRows++; } - regroupedResults.add(regroupedResult); + addResultToCache(regroupedResult); // only update last cell when we actually return it to user. recordLastResult(regroupedResult); } if (!result.mayHaveMoreCellsInRow() && !partialResults.isEmpty()) { // We are done for this row - regroupedResults.add(createCompletedResult()); + addResultToCache(createCompletedResult()); } } - return regroupedResults.toArray(new Result[0]); } @Override - public void clear() { - partialResults.clear(); - numCellsOfPartialResults = 0; + protected void addResultToCache(Result rs) { + cache.add(rs); + for (Cell cell : rs.rawCells()) { + resultSize += CellUtil.estimatedHeapSizeOf(cell); + } + count++; + lastResult = rs; } @Override - public int numberOfCompleteRows() { - return numberOfCompleteRows; + public void clear() { + partialResults.clear(); + numCellsOfPartialResults = 0; + count = 0; + resultSize = 0; + lastResult = null; } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index d548901..8665eea 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -30,8 +30,6 @@ import org.apache.commons.lang.mutable.MutableBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; @@ -136,7 +134,7 @@ public abstract class ClientScanner extends AbstractClientScanner { this.rpcControllerFactory = controllerFactory; this.conf = conf; - this.scanResultCache = createScanResultCache(scan); + this.scanResultCache = createScanResultCache(scan, cache); } protected ClusterConnection getConnection() { @@ -464,19 +462,15 @@ public abstract class ClientScanner extends AbstractClientScanner { // Results to the scanner's cache. If partial results are not allowed to be seen by the // caller, all book keeping will be performed within this method. int numberOfCompleteRowsBefore = scanResultCache.numberOfCompleteRows(); - Result[] resultsToAddToCache = - scanResultCache.addAndGet(values, callable.isHeartbeatMessage()); + scanResultCache.loadResultsToCache(values, callable.isHeartbeatMessage()); int numberOfCompleteRows = scanResultCache.numberOfCompleteRows() - numberOfCompleteRowsBefore; - if (resultsToAddToCache.length > 0) { - for (Result rs : resultsToAddToCache) { - cache.add(rs); - for (Cell cell : rs.rawCells()) { - remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell); - } - countdown--; - this.lastResult = rs; - } + if (scanResultCache.getCount() > 0) { + remainingResultSize -= scanResultCache.getResultSize(); + scanResultCache.resetResultSize(); + countdown -= scanResultCache.getCount(); + scanResultCache.resetCount(); + this.lastResult = scanResultCache.getLastResult(); } if (scan.getLimit() > 0) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteScanResultCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteScanResultCache.java index a132642..aa792be 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteScanResultCache.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteScanResultCache.java @@ -19,9 +19,11 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; +import java.util.LinkedList; import java.util.List; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.Bytes; @@ -29,12 +31,14 @@ import org.apache.hadoop.hbase.util.Bytes; * A scan result cache that only returns complete result. */ @InterfaceAudience.Private -class CompleteScanResultCache implements ScanResultCache { - - private int numberOfCompleteRows; +class CompleteScanResultCache extends ScanResultCache { private final List partialResults = new ArrayList<>(); + public CompleteScanResultCache(LinkedList cache) { + this.cache = cache; + } + private Result combine() throws IOException { Result result = Result.createCompleteResult(partialResults); partialResults.clear(); @@ -61,13 +65,8 @@ class CompleteScanResultCache implements ScanResultCache { return prependResults; } - private Result[] updateNumberOfCompleteResultsAndReturn(Result... results) { - numberOfCompleteRows += results.length; - return results; - } - @Override - public Result[] addAndGet(Result[] results, boolean isHeartbeatMessage) throws IOException { + public void loadResultsToCache(Result[] results, boolean isHeartbeatMessage) throws IOException { // If no results were returned it indicates that either we have the all the partial results // necessary to construct the complete result or the server had to send a heartbeat message // to the client to keep the client-server connection alive @@ -76,9 +75,9 @@ class CompleteScanResultCache implements ScanResultCache { // and thus there may be more partials server side that still need to be added to the partial // list before we form the complete Result if (!partialResults.isEmpty() && !isHeartbeatMessage) { - return updateNumberOfCompleteResultsAndReturn(combine()); + addResultToCache(combine()); } - return EMPTY_RESULT_ARRAY; + return; } // In every RPC response there should be at most a single partial result. Furthermore, if // there is a partial result, it is guaranteed to be in the last position of the array. @@ -86,37 +85,52 @@ class CompleteScanResultCache implements ScanResultCache { if (last.mayHaveMoreCellsInRow()) { if (partialResults.isEmpty()) { partialResults.add(last); - return updateNumberOfCompleteResultsAndReturn(Arrays.copyOf(results, results.length - 1)); + addResultArrayToCache(results, 0, results.length - 1); + return; } // We have only one result and it is partial if (results.length == 1) { // check if there is a row change if (Bytes.equals(partialResults.get(0).getRow(), last.getRow())) { partialResults.add(last); - return EMPTY_RESULT_ARRAY; + return; } Result completeResult = combine(); partialResults.add(last); - return updateNumberOfCompleteResultsAndReturn(completeResult); + addResultToCache(completeResult); + return; } // We have some complete results - Result[] resultsToReturn = prependCombined(results, results.length - 1); + Result[] resultsToCache = prependCombined(results, results.length - 1); partialResults.add(last); - return updateNumberOfCompleteResultsAndReturn(resultsToReturn); + addResultArrayToCache(resultsToCache, 0, resultsToCache.length); + return; } if (!partialResults.isEmpty()) { - return updateNumberOfCompleteResultsAndReturn(prependCombined(results, results.length)); + Result[] resultsToCache = prependCombined(results, results.length); + addResultArrayToCache(resultsToCache, 0, resultsToCache.length); + return; } - return updateNumberOfCompleteResultsAndReturn(results); + addResultArrayToCache(results, 0, results.length); + return; } @Override - public void clear() { - partialResults.clear(); + protected void addResultToCache(Result rs) { + cache.add(rs); + for (Cell cell : rs.rawCells()) { + resultSize += CellUtil.estimatedHeapSizeOf(cell); + } + count++; + numberOfCompleteRows++; + lastResult = rs; } @Override - public int numberOfCompleteRows() { - return numberOfCompleteRows; + public void clear() { + partialResults.clear(); + count = 0; + resultSize = 0; + lastResult = null; } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index 96e7788..e02d1cf 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -25,6 +25,7 @@ import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.util.Arrays; import java.util.Comparator; +import java.util.LinkedList; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadLocalRandom; @@ -288,13 +289,13 @@ public class ConnectionUtils { return Bytes.compareTo(info.getStartKey(), scan.getStopRow()) <= 0; } - public static ScanResultCache createScanResultCache(Scan scan) { + public static ScanResultCache createScanResultCache(Scan scan, LinkedList cache) { if (scan.getAllowPartialResults()) { - return new AllowPartialScanResultCache(); + return new AllowPartialScanResultCache(cache); } else if (scan.getBatch() > 0) { - return new BatchScanResultCache(scan.getBatch()); + return new BatchScanResultCache(cache, scan.getBatch()); } else { - return new CompleteScanResultCache(); + return new CompleteScanResultCache(cache); } } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultCache.java index 2d28e1a..651ec49 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultCache.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultCache.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; +import java.util.LinkedList; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -33,9 +34,14 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; * */ @InterfaceAudience.Private -interface ScanResultCache { +public abstract class ScanResultCache { static final Result[] EMPTY_RESULT_ARRAY = new Result[0]; + protected int numberOfCompleteRows; + protected long resultSize = 0; + protected int count = 0; + protected Result lastResult = null; + protected LinkedList cache; /** * Add the given results to cache and get valid results back. @@ -43,16 +49,58 @@ interface ScanResultCache { * @param isHeartbeatMessage indicate whether the results is gotten from a heartbeat response. * @return valid results, never null. */ - Result[] addAndGet(Result[] results, boolean isHeartbeatMessage) throws IOException; + public abstract void loadResultsToCache(Result[] results, boolean isHeartbeatMessage) throws IOException; /** * Clear the cached result if any. Called when scan error and we will start from a start of a row * again. */ - void clear(); + public abstract void clear(); /** * Return the number of complete rows. Used to implement limited scan. */ - int numberOfCompleteRows(); + public int numberOfCompleteRows() { + return numberOfCompleteRows; + } + + /** + * Add result array received from server to cache + * @param resultsToAddToCache The array of Results returned from the server + * @param start start index to cache from Results array + * @param end last index to cache from Results array + */ + protected void addResultArrayToCache(Result[] resultsToAddToCache, int start, int end) { + if (resultsToAddToCache != null) { + for (int r = start; r < end; r++) { + addResultToCache(resultsToAddToCache[r]); + } + } + } + + /** + * Add the result received from server or result constructed from partials to cache + * @param rs Result to cache from Results array or constructed from partial results + */ + protected abstract void addResultToCache(Result rs); + + public long getResultSize() { + return resultSize; + } + + public int getCount() { + return count; + } + + public void resetResultSize() { + resultSize = 0; + } + + public void resetCount() { + count = 0; + } + + public Result getLastResult() { + return lastResult; + } } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAllowPartialScanResultCache.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAllowPartialScanResultCache.java index 3fe43a5..6938983 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAllowPartialScanResultCache.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAllowPartialScanResultCache.java @@ -19,10 +19,10 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.client.TestBatchScanResultCache.createCells; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertSame; import java.io.IOException; import java.util.Arrays; +import java.util.LinkedList; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.testclassification.ClientTests; @@ -39,10 +39,11 @@ public class TestAllowPartialScanResultCache { private static byte[] CF = Bytes.toBytes("cf"); private AllowPartialScanResultCache resultCache; + private static final LinkedList cache = new LinkedList(); @Before public void setUp() { - resultCache = new AllowPartialScanResultCache(); + resultCache = new AllowPartialScanResultCache(cache); } @After @@ -53,16 +54,17 @@ public class TestAllowPartialScanResultCache { @Test public void test() throws IOException { - assertSame(ScanResultCache.EMPTY_RESULT_ARRAY, - resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, false)); - assertSame(ScanResultCache.EMPTY_RESULT_ARRAY, - resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, true)); + resultCache.loadResultsToCache(ScanResultCache.EMPTY_RESULT_ARRAY, false); + assertEquals(0, cache.size()); + resultCache.loadResultsToCache(ScanResultCache.EMPTY_RESULT_ARRAY, true); + assertEquals(0, cache.size()); Cell[] cells1 = createCells(CF, 1, 10); Cell[] cells2 = createCells(CF, 2, 10); - Result[] results1 = resultCache.addAndGet( + resultCache.loadResultsToCache( new Result[] { Result.create(Arrays.copyOf(cells1, 5), null, false, true) }, false); + Result[] results1 = cache.toArray(new Result[0]); assertEquals(1, results1.length); assertEquals(1, Bytes.toInt(results1[0].getRow())); assertEquals(5, results1[0].rawCells().length); @@ -70,8 +72,10 @@ public class TestAllowPartialScanResultCache { assertEquals(1, Bytes.toInt(results1[0].getValue(CF, Bytes.toBytes("cq" + i)))); } - Result[] results2 = resultCache.addAndGet( + cache.clear(); + resultCache.loadResultsToCache( new Result[] { Result.create(Arrays.copyOfRange(cells1, 1, 10), null, false, true) }, false); + Result[] results2 = cache.toArray(new Result[0]); assertEquals(1, results2.length); assertEquals(1, Bytes.toInt(results2[0].getRow())); assertEquals(5, results2[0].rawCells().length); @@ -79,8 +83,10 @@ public class TestAllowPartialScanResultCache { assertEquals(1, Bytes.toInt(results2[0].getValue(CF, Bytes.toBytes("cq" + i)))); } - Result[] results3 = - resultCache.addAndGet(new Result[] { Result.create(cells1), Result.create(cells2) }, false); + cache.clear(); + resultCache.loadResultsToCache( + new Result[] { Result.create(cells1), Result.create(cells2) }, false); + Result[] results3 = cache.toArray(new Result[0]); assertEquals(1, results3.length); assertEquals(2, Bytes.toInt(results3[0].getRow())); assertEquals(10, results3[0].rawCells().length); diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBatchScanResultCache.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBatchScanResultCache.java index 31a4594..a2dbf46 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBatchScanResultCache.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBatchScanResultCache.java @@ -18,10 +18,10 @@ package org.apache.hadoop.hbase.client; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertSame; import java.io.IOException; import java.util.Arrays; +import java.util.LinkedList; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; @@ -40,9 +40,11 @@ public class TestBatchScanResultCache { private BatchScanResultCache resultCache; + protected final LinkedList cache = new LinkedList(); + @Before public void setUp() { - resultCache = new BatchScanResultCache(4); + resultCache = new BatchScanResultCache(cache, 4); } @After @@ -73,28 +75,34 @@ public class TestBatchScanResultCache { @Test public void test() throws IOException { - assertSame(ScanResultCache.EMPTY_RESULT_ARRAY, - resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, false)); - assertSame(ScanResultCache.EMPTY_RESULT_ARRAY, - resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, true)); + resultCache.loadResultsToCache(ScanResultCache.EMPTY_RESULT_ARRAY, false); + assertEquals(0, cache.size()); + resultCache.loadResultsToCache(ScanResultCache.EMPTY_RESULT_ARRAY, true); + assertEquals(0, cache.size()); Cell[] cells1 = createCells(CF, 1, 10); Cell[] cells2 = createCells(CF, 2, 10); Cell[] cells3 = createCells(CF, 3, 10); - assertEquals(0, resultCache.addAndGet( - new Result[] { Result.create(Arrays.copyOf(cells1, 3), null, false, true) }, false).length); - Result[] results = resultCache.addAndGet( + resultCache.loadResultsToCache( + new Result[] { Result.create(Arrays.copyOf(cells1, 3), null, false, true) }, false); + assertEquals(0, cache.size()); + resultCache.loadResultsToCache( new Result[] { Result.create(Arrays.copyOfRange(cells1, 3, 7), null, false, true), Result.create(Arrays.copyOfRange(cells1, 7, 10), null, false, true) }, false); + Result[] results = cache.toArray(new Result[0]); assertEquals(2, results.length); assertResultEquals(results[0], 1, 0, 4); assertResultEquals(results[1], 1, 4, 8); - results = resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, false); + + cache.clear(); + resultCache.loadResultsToCache(ScanResultCache.EMPTY_RESULT_ARRAY, false); + results = cache.toArray(new Result[0]); assertEquals(1, results.length); assertResultEquals(results[0], 1, 8, 10); - results = resultCache.addAndGet( + cache.clear(); + resultCache.loadResultsToCache( new Result[] { Result.create(Arrays.copyOfRange(cells2, 0, 4), null, false, true), Result.create(Arrays.copyOfRange(cells2, 4, 8), null, false, true), Result.create(Arrays.copyOfRange(cells2, 8, 10), null, false, true), @@ -102,6 +110,7 @@ public class TestBatchScanResultCache { Result.create(Arrays.copyOfRange(cells3, 4, 8), null, false, true), Result.create(Arrays.copyOfRange(cells3, 8, 10), null, false, false) }, false); + results = cache.toArray(new Result[0]); assertEquals(6, results.length); assertResultEquals(results[0], 2, 0, 4); assertResultEquals(results[1], 2, 4, 8); diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestCompleteResultScanResultCache.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestCompleteResultScanResultCache.java index 8759593..2ec372a 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestCompleteResultScanResultCache.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestCompleteResultScanResultCache.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertSame; import java.io.IOException; import java.util.Arrays; +import java.util.LinkedList; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; @@ -46,9 +47,11 @@ public class TestCompleteResultScanResultCache { private CompleteScanResultCache resultCache; + private final LinkedList cache = new LinkedList(); + @Before public void setUp() { - resultCache = new CompleteScanResultCache(); + resultCache = new CompleteScanResultCache(cache); } @After @@ -63,33 +66,50 @@ public class TestCompleteResultScanResultCache { @Test public void testNoPartial() throws IOException { - assertSame(ScanResultCache.EMPTY_RESULT_ARRAY, - resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, false)); - assertSame(ScanResultCache.EMPTY_RESULT_ARRAY, - resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, true)); + cache.clear(); + + resultCache.loadResultsToCache(ScanResultCache.EMPTY_RESULT_ARRAY, false); + assertEquals(0, cache.size()); + resultCache.loadResultsToCache(ScanResultCache.EMPTY_RESULT_ARRAY, true); + assertEquals(0, cache.size()); + int count = 10; Result[] results = new Result[count]; for (int i = 0; i < count; i++) { results[i] = Result.create(Arrays.asList(createCell(i, CQ1))); } - assertSame(results, resultCache.addAndGet(results, false)); + resultCache.loadResultsToCache(results, false); + results = cache.toArray(new Result[0]); + assertEquals(count, results.length); + for (int i = 0; i < count; i++) { + assertEquals(i, Bytes.toInt(results[i].getRow())); + assertEquals(i, Bytes.toInt(results[i].getValue(CF, CQ1))); + } } @Test public void testCombine1() throws IOException { + cache.clear(); + Result previousResult = Result.create(Arrays.asList(createCell(0, CQ1)), null, false, true); Result result1 = Result.create(Arrays.asList(createCell(1, CQ1)), null, false, true); Result result2 = Result.create(Arrays.asList(createCell(1, CQ2)), null, false, true); Result result3 = Result.create(Arrays.asList(createCell(1, CQ3)), null, false, true); - Result[] results = resultCache.addAndGet(new Result[] { previousResult, result1 }, false); + resultCache.loadResultsToCache(new Result[] { previousResult, result1 }, false); + Result[] results = cache.toArray(new Result[0]); assertEquals(1, results.length); assertSame(previousResult, results[0]); - assertEquals(0, resultCache.addAndGet(new Result[] { result2 }, false).length); - assertEquals(0, resultCache.addAndGet(new Result[] { result3 }, false).length); - assertEquals(0, resultCache.addAndGet(new Result[0], true).length); + cache.clear(); + resultCache.loadResultsToCache(new Result[] { result2 }, false); + assertEquals(0, cache.size()); + resultCache.loadResultsToCache(new Result[] { result3 }, false); + assertEquals(0, cache.size()); + resultCache.loadResultsToCache(new Result[0], true); + assertEquals(0, cache.size()); - results = resultCache.addAndGet(new Result[0], false); + resultCache.loadResultsToCache(new Result[0], false); + results = cache.toArray(new Result[0]); assertEquals(1, results.length); assertEquals(1, Bytes.toInt(results[0].getRow())); assertEquals(3, results[0].rawCells().length); @@ -100,17 +120,23 @@ public class TestCompleteResultScanResultCache { @Test public void testCombine2() throws IOException { + cache.clear(); + Result result1 = Result.create(Arrays.asList(createCell(1, CQ1)), null, false, true); Result result2 = Result.create(Arrays.asList(createCell(1, CQ2)), null, false, true); Result result3 = Result.create(Arrays.asList(createCell(1, CQ3)), null, false, true); Result nextResult1 = Result.create(Arrays.asList(createCell(2, CQ1)), null, false, true); Result nextToNextResult1 = Result.create(Arrays.asList(createCell(3, CQ2)), null, false, false); - assertEquals(0, resultCache.addAndGet(new Result[] { result1 }, false).length); - assertEquals(0, resultCache.addAndGet(new Result[] { result2 }, false).length); - assertEquals(0, resultCache.addAndGet(new Result[] { result3 }, false).length); + resultCache.loadResultsToCache(new Result[] { result1 }, false); + assertEquals(0, cache.size()); + resultCache.loadResultsToCache(new Result[] { result2 }, false); + assertEquals(0, cache.size()); + resultCache.loadResultsToCache(new Result[] { result3 }, false); + assertEquals(0, cache.size()); - Result[] results = resultCache.addAndGet(new Result[] { nextResult1 }, false); + resultCache.loadResultsToCache(new Result[] { nextResult1 }, false); + Result[] results = cache.toArray(new Result[0]); assertEquals(1, results.length); assertEquals(1, Bytes.toInt(results[0].getRow())); assertEquals(3, results[0].rawCells().length); @@ -118,7 +144,9 @@ public class TestCompleteResultScanResultCache { assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ2))); assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ3))); - results = resultCache.addAndGet(new Result[] { nextToNextResult1 }, false); + cache.clear(); + resultCache.loadResultsToCache(new Result[] { nextToNextResult1 }, false); + results = cache.toArray(new Result[0]); assertEquals(2, results.length); assertEquals(2, Bytes.toInt(results[0].getRow())); assertEquals(1, results[0].rawCells().length); @@ -130,16 +158,20 @@ public class TestCompleteResultScanResultCache { @Test public void testCombine3() throws IOException { + cache.clear(); + Result result1 = Result.create(Arrays.asList(createCell(1, CQ1)), null, false, true); Result result2 = Result.create(Arrays.asList(createCell(1, CQ2)), null, false, true); Result nextResult1 = Result.create(Arrays.asList(createCell(2, CQ1)), null, false, false); Result nextToNextResult1 = Result.create(Arrays.asList(createCell(3, CQ1)), null, false, true); - assertEquals(0, resultCache.addAndGet(new Result[] { result1 }, false).length); - assertEquals(0, resultCache.addAndGet(new Result[] { result2 }, false).length); + resultCache.loadResultsToCache(new Result[] { result1 }, false); + assertEquals(0, cache.size()); + resultCache.loadResultsToCache(new Result[] { result2 }, false); + assertEquals(0, cache.size()); - Result[] results = - resultCache.addAndGet(new Result[] { nextResult1, nextToNextResult1 }, false); + resultCache.loadResultsToCache(new Result[] { nextResult1, nextToNextResult1 }, false); + Result[] results = cache.toArray(new Result[0]); assertEquals(2, results.length); assertEquals(1, Bytes.toInt(results[0].getRow())); assertEquals(2, results[0].rawCells().length); @@ -149,7 +181,9 @@ public class TestCompleteResultScanResultCache { assertEquals(1, results[1].rawCells().length); assertEquals(2, Bytes.toInt(results[1].getValue(CF, CQ1))); - results = resultCache.addAndGet(new Result[0], false); + cache.clear(); + resultCache.loadResultsToCache(new Result[0], false); + results = cache.toArray(new Result[0]); assertEquals(1, results.length); assertEquals(3, Bytes.toInt(results[0].getRow())); assertEquals(1, results[0].rawCells().length); @@ -158,21 +192,27 @@ public class TestCompleteResultScanResultCache { @Test public void testCombine4() throws IOException { + cache.clear(); + Result result1 = Result.create(Arrays.asList(createCell(1, CQ1)), null, false, true); Result result2 = Result.create(Arrays.asList(createCell(1, CQ2)), null, false, false); Result nextResult1 = Result.create(Arrays.asList(createCell(2, CQ1)), null, false, true); Result nextResult2 = Result.create(Arrays.asList(createCell(2, CQ2)), null, false, false); - assertEquals(0, resultCache.addAndGet(new Result[] { result1 }, false).length); + resultCache.loadResultsToCache(new Result[] { result1 }, false); + assertEquals(0, cache.size()); - Result[] results = resultCache.addAndGet(new Result[] { result2, nextResult1 }, false); + resultCache.loadResultsToCache(new Result[] { result2, nextResult1 }, false); + Result[] results = cache.toArray(new Result[0]);; assertEquals(1, results.length); assertEquals(1, Bytes.toInt(results[0].getRow())); assertEquals(2, results[0].rawCells().length); assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ1))); assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ2))); - results = resultCache.addAndGet(new Result[] { nextResult2 }, false); + cache.clear(); + resultCache.loadResultsToCache(new Result[] { nextResult2 }, false); + results = cache.toArray(new Result[0]); assertEquals(1, results.length); assertEquals(2, Bytes.toInt(results[0].getRow())); assertEquals(2, results[0].rawCells().length); -- 2.7.4