From 82ed3e751cd1ec63ada20a3a61239019f82519f4 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Thu, 16 Mar 2017 15:25:56 +0800 Subject: [PATCH] HBASE-17793 Backport ScanResultCache related code to branch-1 --- .../hbase/client/AllowPartialScanResultCache.java | 73 ++++++ .../hadoop/hbase/client/BatchScanResultCache.java | 142 ++++++++++++ .../apache/hadoop/hbase/client/ClientScanner.java | 254 +-------------------- .../hbase/client/CompleteScanResultCache.java | 110 +++++++++ .../hadoop/hbase/client/ConnectionUtils.java | 48 ++++ .../org/apache/hadoop/hbase/client/Result.java | 69 +++--- .../hadoop/hbase/client/ScanResultCache.java | 53 +++++ .../client/TestAllowPartialScanResultCache.java | 91 ++++++++ .../hbase/client/TestBatchScanResultCache.java | 113 +++++++++ .../client/TestCompleteResultScanResultCache.java | 182 +++++++++++++++ 10 files changed, 858 insertions(+), 277 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/BatchScanResultCache.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteScanResultCache.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultCache.java create mode 100644 hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAllowPartialScanResultCache.java create mode 100644 hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBatchScanResultCache.java create mode 100644 hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestCompleteResultScanResultCache.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java new file mode 100644 index 0000000..82f1ea0 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.client.ConnectionUtils.filterCells; + +import java.io.IOException; +import java.util.Arrays; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * A ScanResultCache that may return partial result. + *

+ * As we can only scan from the starting of a row when error, so here we also implement the logic + * that skips the cells that have already been returned. + */ +@InterfaceAudience.Private +class AllowPartialScanResultCache implements ScanResultCache { + + // used to filter out the cells that already returned to user as we always start from the + // beginning of a row when retry. + private Cell lastCell; + + private void updateLastCell(Result result) { + lastCell = result.rawCells()[result.rawCells().length - 1]; + } + + @Override + public Result[] addAndGet(Result[] results, boolean isHeartbeatMessage) throws IOException { + if (results.length == 0) { + return EMPTY_RESULT_ARRAY; + } + int i; + for (i = 0; i < results.length; i++) { + Result r = filterCells(results[i], lastCell); + if (r != null) { + results[i] = r; + break; + } + } + if (i == results.length) { + return EMPTY_RESULT_ARRAY; + } + updateLastCell(results[results.length - 1]); + if (i > 0) { + return Arrays.copyOfRange(results, i, results.length); + } else { + return results; + } + } + + @Override + public void clear() { + // we do not cache anything + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BatchScanResultCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BatchScanResultCache.java new file mode 100644 index 0000000..9ab959b --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BatchScanResultCache.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.client.ConnectionUtils.filterCells; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Deque; +import java.util.List; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * A scan result cache for batched scan, i.e, + * {@code scan.getBatch() > 0 && !scan.getAllowPartialResults()}. + *

+ * If user setBatch(5) and rpc returns 3+5+5+5+3 cells, we should return 5+5+5+5+1 to user. setBatch + * doesn't mean setAllowPartialResult(true). + */ +@InterfaceAudience.Private +public class BatchScanResultCache implements ScanResultCache { + + private final int batch; + + // used to filter out the cells that already returned to user as we always start from the + // beginning of a row when retry. + private Cell lastCell; + + private final Deque partialResults = new ArrayDeque<>(); + + private int numCellsOfPartialResults; + + public BatchScanResultCache(int batch) { + this.batch = batch; + } + + private void updateLastCell(Result result) { + lastCell = result.rawCells()[result.rawCells().length - 1]; + } + + private Result createCompletedResult() throws IOException { + Result result = Result.createCompleteResult(partialResults); + partialResults.clear(); + numCellsOfPartialResults = 0; + return result; + } + + // Add new result to the partial list and return a batched Result if caching size exceed batching + // limit. As the RS will also respect the scan.getBatch, we can make sure that we will get only + // one Result back at most(or null, which means we do not have enough cells). + private Result regroupResults(Result result) { + partialResults.addLast(result); + numCellsOfPartialResults += result.size(); + if (numCellsOfPartialResults < batch) { + return null; + } + Cell[] cells = new Cell[batch]; + int cellCount = 0; + boolean stale = false; + for (;;) { + Result r = partialResults.pollFirst(); + stale = stale || r.isStale(); + int newCellCount = cellCount + r.size(); + if (newCellCount > batch) { + // We have more cells than expected, so split the current result + int len = batch - cellCount; + System.arraycopy(r.rawCells(), 0, cells, cellCount, len); + Cell[] remainingCells = new Cell[r.size() - len]; + System.arraycopy(r.rawCells(), len, remainingCells, 0, r.size() - len); + partialResults.addFirst( + Result.create(remainingCells, r.getExists(), r.isStale(), r.mayHaveMoreCellsInRow())); + break; + } + System.arraycopy(r.rawCells(), 0, cells, cellCount, r.size()); + if (newCellCount == batch) { + break; + } + cellCount = newCellCount; + } + numCellsOfPartialResults -= batch; + return Result.create(cells, null, stale, + result.mayHaveMoreCellsInRow() || !partialResults.isEmpty()); + } + + @Override + public Result[] addAndGet(Result[] results, boolean isHeartbeatMessage) throws IOException { + if (results.length == 0) { + if (!partialResults.isEmpty() && !isHeartbeatMessage) { + return new Result[] { createCompletedResult() }; + } + return EMPTY_RESULT_ARRAY; + } + List regroupedResults = new ArrayList<>(); + for (Result result : results) { + result = filterCells(result, lastCell); + if (result == null) { + continue; + } + // check if we have a row change + if (!partialResults.isEmpty() && + !Bytes.equals(partialResults.peek().getRow(), result.getRow())) { + regroupedResults.add(createCompletedResult()); + } + Result regroupedResult = regroupResults(result); + if (regroupedResult != null) { + regroupedResults.add(regroupedResult); + // only update last cell when we actually return it to user. + updateLastCell(regroupedResult); + } + if (!result.mayHaveMoreCellsInRow() && !partialResults.isEmpty()) { + // We are done for this row + regroupedResults.add(createCompletedResult()); + } + } + return regroupedResults.toArray(new Result[0]); + } + + @Override + public void clear() { + partialResults.clear(); + numCellsOfPartialResults = 0; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 01ea993..57586d8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -17,16 +17,15 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.client.ConnectionUtils.createScanResultCache; import static org.apache.hadoop.hbase.client.ConnectionUtils.numberOfIndividualRows; import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.io.InterruptedIOException; -import java.util.ArrayList; import java.util.Arrays; import java.util.LinkedList; -import java.util.List; import java.util.concurrent.ExecutorService; import org.apache.commons.lang.mutable.MutableBoolean; @@ -34,13 +33,11 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.KeyValue.MetaComparator; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.UnknownScannerException; @@ -69,23 +66,7 @@ public abstract class ClientScanner extends AbstractClientScanner { protected HRegionInfo currentRegion = null; protected ScannerCallableWithReplicas callable = null; protected final LinkedList cache = new LinkedList(); - /** - * A list of partial results that have been returned from the server. This list should only - * contain results if this scanner does not have enough partial results to form the complete - * result. - */ - protected final LinkedList partialResults = new LinkedList(); - protected int partialResultsCellSizes = 0; - /** - * The row for which we are accumulating partial Results (i.e. the row of the Results stored - * inside partialResults). Changes to partialResultsRow and partialResults are kept in sync via - * the methods {@link #regroupResults(Result)} and {@link #clearPartialResults()} - */ - protected byte[] partialResultsRow = null; - /** - * The last cell from a not full Row which is added to cache - */ - protected Cell lastCellLoadedToCache = null; + private final ScanResultCache scanResultCache; protected final int caching; protected long lastNext; // Keep lastResult returned successfully in case we have to reset scanner. @@ -106,7 +87,6 @@ public abstract class ClientScanner extends AbstractClientScanner { protected final int primaryOperationTimeout; private int retries; protected final ExecutorService pool; - private static MetaComparator metaComparator = new MetaComparator(); /** * Create a new ClientScanner for the specified table Note that the passed {@link Scan}'s start @@ -159,6 +139,7 @@ public abstract class ClientScanner extends AbstractClientScanner { this.rpcControllerFactory = controllerFactory; this.conf = conf; + this.scanResultCache = createScanResultCache(scan); } protected ClusterConnection getConnection() { @@ -361,14 +342,7 @@ public abstract class ClientScanner extends AbstractClientScanner { private void closeScannerIfExhausted(boolean exhausted) throws IOException { if (exhausted) { - if (!partialResults.isEmpty()) { - // XXX: continue if there are partial results. But in fact server should not set - // hasMoreResults to false if there are partial results. - LOG.warn("Server tells us there is no more results for this region but we still have" + - " partialResults, this should not happen, retry on the current scanner anyway"); - } else { - closeScanner(); - } + closeScanner(); } } @@ -376,7 +350,7 @@ public abstract class ClientScanner extends AbstractClientScanner { MutableBoolean retryAfterOutOfOrderException, int retriesLeft) throws DoNotRetryIOException { // An exception was thrown which makes any partial results that we were collecting // invalid. The scanner will need to be reset to the beginning of a row. - clearPartialResults(); + scanResultCache.clear(); // Unfortunately, DNRIOE is used in two different semantics. // (1) The first is to close the client scanner and bubble up the exception all the way @@ -470,7 +444,7 @@ public abstract class ClientScanner extends AbstractClientScanner { if (callable.switchedToADifferentReplica()) { // Any accumulated partial results are no longer valid since the callable will // openScanner with the correct startkey and we must pick up from there - clearPartialResults(); + scanResultCache.clear(); this.currentRegion = callable.getHRegionInfo(); } retryAfterOutOfOrderException.setValue(true); @@ -490,28 +464,20 @@ public abstract class ClientScanner extends AbstractClientScanner { // Groom the array of Results that we received back from the server before adding that // Results to the scanner's cache. If partial results are not allowed to be seen by the // caller, all book keeping will be performed within this method. - List resultsToAddToCache = - getResultsToAddToCache(values, callable.isHeartbeatMessage()); - if (!resultsToAddToCache.isEmpty()) { + Result[] resultsToAddToCache = + scanResultCache.addAndGet(values, callable.isHeartbeatMessage()); + if (resultsToAddToCache.length > 0) { for (Result rs : resultsToAddToCache) { - rs = filterLoadedCell(rs); - if (rs == null) { - continue; - } cache.add(rs); for (Cell cell : rs.rawCells()) { remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell); } countdown--; this.lastResult = rs; - if (this.lastResult.mayHaveMoreCellsInRow()) { - updateLastCellLoadedToCache(this.lastResult); - } else { - this.lastCellLoadedToCache = null; - } } - if (scan.getLimit() > 0 && !resultsToAddToCache.isEmpty()) { - int newLimit = scan.getLimit() - numberOfIndividualRows(resultsToAddToCache); + if (scan.getLimit() > 0) { + int newLimit = + scan.getLimit() - numberOfIndividualRows(Arrays.asList(resultsToAddToCache)); assert newLimit >= 0; scan.setLimit(newLimit); } @@ -554,13 +520,6 @@ public abstract class ClientScanner extends AbstractClientScanner { } // we are done with the current region if (regionExhausted) { - if (!partialResults.isEmpty()) { - // XXX: continue if there are partial results. But in fact server should not set - // hasMoreResults to false if there are partial results. - LOG.warn("Server tells us there is no more results for this region but we still have" + - " partialResults, this should not happen, retry on the current scanner anyway"); - continue; - } if (!moveToNextRegion()) { break; } @@ -568,141 +527,6 @@ public abstract class ClientScanner extends AbstractClientScanner { } } - /** - * This method ensures all of our book keeping regarding partial results is kept up to date. This - * method should be called once we know that the results we received back from the RPC request do - * not contain errors. We return a list of results that should be added to the cache. In general, - * this list will contain all NON-partial results from the input array (unless the client has - * specified that they are okay with receiving partial results) - * @param resultsFromServer The array of {@link Result}s returned from the server - * @param heartbeatMessage Flag indicating whether or not the response received from the server - * represented a complete response, or a heartbeat message that was sent to keep the - * client-server connection alive - * @return the list of results that should be added to the cache. - * @throws IOException - */ - protected List getResultsToAddToCache(Result[] resultsFromServer, - boolean heartbeatMessage) throws IOException { - int resultSize = resultsFromServer != null ? resultsFromServer.length : 0; - List resultsToAddToCache = new ArrayList(resultSize); - - // If the caller has indicated in their scan that they are okay with seeing partial results, - // then simply add all results to the list. Note allowPartial and setBatch are not same, we can - // return here if allow partials and we will handle batching later. - if (scan.getAllowPartialResults()) { - addResultsToList(resultsToAddToCache, resultsFromServer, 0, - (null == resultsFromServer ? 0 : resultsFromServer.length)); - return resultsToAddToCache; - } - - // If no results were returned it indicates that either we have the all the partial results - // necessary to construct the complete result or the server had to send a heartbeat message - // to the client to keep the client-server connection alive - if (resultsFromServer == null || resultsFromServer.length == 0) { - // If this response was an empty heartbeat message, then we have not exhausted the region - // and thus there may be more partials server side that still need to be added to the partial - // list before we form the complete Result - if (!partialResults.isEmpty() && !heartbeatMessage) { - resultsToAddToCache.add(Result.createCompleteResult(partialResults)); - clearPartialResults(); - } - - return resultsToAddToCache; - } - - for(Result result : resultsFromServer) { - if (partialResultsRow != null && Bytes.compareTo(result.getRow(), partialResultsRow) != 0) { - // We have a new row, complete the previous row. - resultsToAddToCache.add(Result.createCompleteResult(partialResults)); - clearPartialResults(); - } - Result res = regroupResults(result); - if (res != null) { - resultsToAddToCache.add(res); - } - if (!result.mayHaveMoreCellsInRow()) { - // We are done for this row - if (partialResultsCellSizes > 0) { - resultsToAddToCache.add(Result.createCompleteResult(partialResults)); - } - clearPartialResults(); - } - } - - return resultsToAddToCache; - } - - /** - * Add new result to the partial list and return a batched Result if caching size exceed - * batching limit. - * If user setBatch(5) and rpc returns 3+5+5+5+3 cells, we should return 5+5+5+5+1 to user. - * setBatch doesn't mean setAllowPartialResult(true) - * @param result The result that we want to add to our list of partial Results - * @return the result if we have batch limit and there is one Result can be returned to user, or - * null if we have not. - * @throws IOException - */ - private Result regroupResults(final Result result) throws IOException { - partialResultsRow = result.getRow(); - partialResults.add(result); - partialResultsCellSizes += result.size(); - if (scan.getBatch() > 0 && partialResultsCellSizes >= scan.getBatch()) { - Cell[] cells = new Cell[scan.getBatch()]; - int count = 0; - boolean stale = false; - while (count < scan.getBatch()) { - Result res = partialResults.poll(); - stale = stale || res.isStale(); - if (res.size() + count <= scan.getBatch()) { - System.arraycopy(res.rawCells(), 0, cells, count, res.size()); - count += res.size(); - } else { - int len = scan.getBatch() - count; - System.arraycopy(res.rawCells(), 0, cells, count, len); - Cell[] remainingCells = new Cell[res.size() - len]; - System.arraycopy(res.rawCells(), len, remainingCells, 0, res.size() - len); - Result remainingRes = Result.create(remainingCells, res.getExists(), res.isStale(), - res.mayHaveMoreCellsInRow()); - partialResults.addFirst(remainingRes); - count = scan.getBatch(); - } - } - partialResultsCellSizes -= scan.getBatch(); - if (partialResultsCellSizes == 0) { - // We have nothing in partialResults, clear the flags to prevent returning empty Result - // when next result belongs to the next row. - clearPartialResults(); - } - return Result.create(cells, null, stale, - partialResultsCellSizes > 0 || result.mayHaveMoreCellsInRow()); - } - return null; - } - - /** - * Convenience method for clearing the list of partials and resetting the partialResultsRow. - */ - private void clearPartialResults() { - partialResults.clear(); - partialResultsCellSizes = 0; - partialResultsRow = null; - } - - /** - * Helper method for adding results between the indices [start, end) to the outputList - * @param outputList the list that results will be added to - * @param inputArray the array that results are taken from - * @param start beginning index (inclusive) - * @param end ending index (exclusive) - */ - private void addResultsToList(List outputList, Result[] inputArray, int start, int end) { - if (inputArray == null || start < 0 || end > inputArray.length) return; - - for (int i = start; i < end; i++) { - outputList.add(inputArray[i]); - } - } - @Override public void close() { if (!scanMetricsPublished) writeScanMetrics(); @@ -739,58 +563,4 @@ public abstract class ClientScanner extends AbstractClientScanner { } return false; } - - protected void updateLastCellLoadedToCache(Result result) { - if (result.rawCells().length == 0) { - return; - } - this.lastCellLoadedToCache = result.rawCells()[result.rawCells().length - 1]; - } - - /** - * Compare two Cells considering reversed scanner. ReversedScanner only reverses rows, not - * columns. - */ - private int compare(Cell a, Cell b) { - int r = 0; - if (currentRegion != null && currentRegion.isMetaRegion()) { - r = metaComparator.compareRows(a, b); - } else { - r = CellComparator.compareRows(a, b); - } - if (r != 0) { - return this.scan.isReversed() ? -r : r; - } - return CellComparator.compareWithoutRow(a, b); - } - - private Result filterLoadedCell(Result result) { - // we only filter result when last result is partial - // so lastCellLoadedToCache and result should have same row key. - // However, if 1) read some cells; 1.1) delete this row at the same time 2) move region; - // 3) read more cell. lastCellLoadedToCache and result will be not at same row. - if (lastCellLoadedToCache == null || result.rawCells().length == 0) { - return result; - } - if (compare(this.lastCellLoadedToCache, result.rawCells()[0]) < 0) { - // The first cell of this result is larger than the last cell of loadcache. - // If user do not allow partial result, it must be true. - return result; - } - if (compare(this.lastCellLoadedToCache, result.rawCells()[result.rawCells().length - 1]) >= 0) { - // The last cell of this result is smaller than the last cell of loadcache, skip all. - return null; - } - - // The first one must not in filtered result, we start at the second. - int index = 1; - while (index < result.rawCells().length) { - if (compare(this.lastCellLoadedToCache, result.rawCells()[index]) < 0) { - break; - } - index++; - } - Cell[] list = Arrays.copyOfRange(result.rawCells(), index, result.rawCells().length); - return Result.create(list, result.getExists(), result.isStale(), result.mayHaveMoreCellsInRow()); - } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteScanResultCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteScanResultCache.java new file mode 100644 index 0000000..e09ddfb --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteScanResultCache.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * A scan result cache that only returns complete result. + */ +@InterfaceAudience.Private +class CompleteScanResultCache implements ScanResultCache { + + private final List partialResults = new ArrayList<>(); + + private Result combine() throws IOException { + Result result = Result.createCompleteResult(partialResults); + partialResults.clear(); + return result; + } + + private Result[] prependCombined(Result[] results, int length) throws IOException { + if (length == 0) { + return new Result[] { combine() }; + } + // the last part of a partial result may not be marked as partial so here we need to check if + // there is a row change. + int start; + if (Bytes.equals(partialResults.get(0).getRow(), results[0].getRow())) { + partialResults.add(results[0]); + start = 1; + length--; + } else { + start = 0; + } + Result[] prependResults = new Result[length + 1]; + prependResults[0] = combine(); + System.arraycopy(results, start, prependResults, 1, length); + return prependResults; + } + + @Override + public Result[] addAndGet(Result[] results, boolean isHeartbeatMessage) throws IOException { + // If no results were returned it indicates that either we have the all the partial results + // necessary to construct the complete result or the server had to send a heartbeat message + // to the client to keep the client-server connection alive + if (results.length == 0) { + // If this response was an empty heartbeat message, then we have not exhausted the region + // and thus there may be more partials server side that still need to be added to the partial + // list before we form the complete Result + if (!partialResults.isEmpty() && !isHeartbeatMessage) { + return new Result[] { combine() }; + } + return EMPTY_RESULT_ARRAY; + } + // In every RPC response there should be at most a single partial result. Furthermore, if + // there is a partial result, it is guaranteed to be in the last position of the array. + Result last = results[results.length - 1]; + if (last.mayHaveMoreCellsInRow()) { + if (partialResults.isEmpty()) { + partialResults.add(last); + return Arrays.copyOf(results, results.length - 1); + } + // We have only one result and it is partial + if (results.length == 1) { + // check if there is a row change + if (Bytes.equals(partialResults.get(0).getRow(), last.getRow())) { + partialResults.add(last); + return EMPTY_RESULT_ARRAY; + } + Result completeResult = combine(); + partialResults.add(last); + return new Result[] { completeResult }; + } + // We have some complete results + Result[] resultsToReturn = prependCombined(results, results.length - 1); + partialResults.add(last); + return resultsToReturn; + } + if (!partialResults.isEmpty()) { + return prependCombined(results, results.length); + } + return results; + } + + @Override + public void clear() { + partialResults.clear(); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index 97f71e7..1bdc5fe 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -24,12 +24,16 @@ import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.util.Arrays; +import java.util.Comparator; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadLocalRandom; import org.apache.commons.logging.Log; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.ServerName; @@ -239,6 +243,40 @@ public class ConnectionUtils { return Bytes.equals(row, EMPTY_END_ROW); } + private static final Comparator COMPARE_WITHOUT_ROW = new Comparator() { + + @Override + public int compare(Cell o1, Cell o2) { + return CellComparator.compareWithoutRow(o1, o2); + } + }; + + static Result filterCells(Result result, Cell keepCellsAfter) { + if (keepCellsAfter == null) { + // do not need to filter + return result; + } + // not the same row + if (!CellUtil.matchingRow(keepCellsAfter, result.getRow(), 0, result.getRow().length)) { + return result; + } + Cell[] rawCells = result.rawCells(); + int index = Arrays.binarySearch(rawCells, keepCellsAfter, COMPARE_WITHOUT_ROW); + if (index < 0) { + index = -index - 1; + } else { + index++; + } + if (index == 0) { + return result; + } + if (index == rawCells.length) { + return null; + } + return Result.create(Arrays.copyOfRange(rawCells, index, rawCells.length), null, + result.isStale(), result.mayHaveMoreCellsInRow()); + } + static boolean noMoreResultsForScan(Scan scan, HRegionInfo info) { if (isEmptyStopRow(info.getEndKey())) { return true; @@ -287,4 +325,14 @@ public class ConnectionUtils { } return count; } + + public static ScanResultCache createScanResultCache(Scan scan) { + if (scan.getAllowPartialResults()) { + return new AllowPartialScanResultCache(); + } else if (scan.getBatch() > 0) { + return new BatchScanResultCache(scan.getBatch()); + } else { + return new CompleteScanResultCache(); + } + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java index 3483d26..4c67c50 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java @@ -24,7 +24,9 @@ import java.nio.BufferOverflowException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Comparator; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -34,7 +36,6 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScannable; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -857,44 +858,42 @@ public class Result implements CellScannable, CellScanner { * @throws IOException A complete result cannot be formed because the results in the partial list * come from different rows */ - public static Result createCompleteResult(List partialResults) + public static Result createCompleteResult(Iterable partialResults) throws IOException { - List cells = new ArrayList(); + if (partialResults == null) { + return Result.create(Collections. emptyList(), null, false); + } + List cells = new ArrayList<>(); boolean stale = false; byte[] prevRow = null; byte[] currentRow = null; - - if (partialResults != null && !partialResults.isEmpty()) { - for (int i = 0; i < partialResults.size(); i++) { - Result r = partialResults.get(i); - currentRow = r.getRow(); - if (prevRow != null && !Bytes.equals(prevRow, currentRow)) { - throw new IOException( - "Cannot form complete result. Rows of partial results do not match." + - " Partial Results: " + partialResults); - } - - // Ensure that all Results except the last one are marked as partials. The last result - // may not be marked as a partial because Results are only marked as partials when - // the scan on the server side must be stopped due to reaching the maxResultSize. - // Visualizing it makes it easier to understand: - // maxResultSize: 2 cells - // (-x-) represents cell number x in a row - // Example: row1: -1- -2- -3- -4- -5- (5 cells total) - // How row1 will be returned by the server as partial Results: - // Result1: -1- -2- (2 cells, size limit reached, mark as partial) - // Result2: -3- -4- (2 cells, size limit reached, mark as partial) - // Result3: -5- (1 cell, size limit NOT reached, NOT marked as partial) - if (i != (partialResults.size() - 1) && !r.mayHaveMoreCellsInRow()) { - throw new IOException( - "Cannot form complete result. Result is missing partial flag. " + - "Partial Results: " + partialResults); - } - prevRow = currentRow; - stale = stale || r.isStale(); - for (Cell c : r.rawCells()) { - cells.add(c); - } + for (Iterator iter = partialResults.iterator(); iter.hasNext();) { + Result r = iter.next(); + currentRow = r.getRow(); + if (prevRow != null && !Bytes.equals(prevRow, currentRow)) { + throw new IOException( + "Cannot form complete result. Rows of partial results do not match." + + " Partial Results: " + partialResults); + } + // Ensure that all Results except the last one are marked as partials. The last result + // may not be marked as a partial because Results are only marked as partials when + // the scan on the server side must be stopped due to reaching the maxResultSize. + // Visualizing it makes it easier to understand: + // maxResultSize: 2 cells + // (-x-) represents cell number x in a row + // Example: row1: -1- -2- -3- -4- -5- (5 cells total) + // How row1 will be returned by the server as partial Results: + // Result1: -1- -2- (2 cells, size limit reached, mark as partial) + // Result2: -3- -4- (2 cells, size limit reached, mark as partial) + // Result3: -5- (1 cell, size limit NOT reached, NOT marked as partial) + if (iter.hasNext() && !r.mayHaveMoreCellsInRow()) { + throw new IOException("Cannot form complete result. Result is missing partial flag. " + + "Partial Results: " + partialResults); + } + prevRow = currentRow; + stale = stale || r.isStale(); + for (Cell c : r.rawCells()) { + cells.add(c); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultCache.java new file mode 100644 index 0000000..2366b57 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultCache.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Used to separate the row constructing logic. + *

+ * After we add heartbeat support for scan, RS may return partial result even if allowPartial is + * false and batch is 0. With this interface, the implementation now looks like: + *

    + *
  1. Get results from ScanResponse proto.
  2. + *
  3. Pass them to ScanResultCache and get something back.
  4. + *
  5. If we actually get something back, then pass it to ScanObserver.
  6. + *
+ */ +@InterfaceAudience.Private +interface ScanResultCache { + + static final Result[] EMPTY_RESULT_ARRAY = new Result[0]; + + /** + * Add the given results to cache and get valid results back. + * @param results the results of a scan next. Must not be null. + * @param isHeartbeatMessage indicate whether the results is gotten from a heartbeat response. + * @return valid results, never null. + */ + Result[] addAndGet(Result[] results, boolean isHeartbeatMessage) throws IOException; + + /** + * Clear the cached result if any. Called when scan error and we will start from a start of a row + * again. + */ + void clear(); +} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAllowPartialScanResultCache.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAllowPartialScanResultCache.java new file mode 100644 index 0000000..3fe43a5 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAllowPartialScanResultCache.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.client.TestBatchScanResultCache.createCells; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; + +import java.io.IOException; +import java.util.Arrays; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ SmallTests.class, ClientTests.class }) +public class TestAllowPartialScanResultCache { + + private static byte[] CF = Bytes.toBytes("cf"); + + private AllowPartialScanResultCache resultCache; + + @Before + public void setUp() { + resultCache = new AllowPartialScanResultCache(); + } + + @After + public void tearDown() { + resultCache.clear(); + resultCache = null; + } + + @Test + public void test() throws IOException { + assertSame(ScanResultCache.EMPTY_RESULT_ARRAY, + resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, false)); + assertSame(ScanResultCache.EMPTY_RESULT_ARRAY, + resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, true)); + + Cell[] cells1 = createCells(CF, 1, 10); + Cell[] cells2 = createCells(CF, 2, 10); + + Result[] results1 = resultCache.addAndGet( + new Result[] { Result.create(Arrays.copyOf(cells1, 5), null, false, true) }, false); + assertEquals(1, results1.length); + assertEquals(1, Bytes.toInt(results1[0].getRow())); + assertEquals(5, results1[0].rawCells().length); + for (int i = 0; i < 5; i++) { + assertEquals(1, Bytes.toInt(results1[0].getValue(CF, Bytes.toBytes("cq" + i)))); + } + + Result[] results2 = resultCache.addAndGet( + new Result[] { Result.create(Arrays.copyOfRange(cells1, 1, 10), null, false, true) }, false); + assertEquals(1, results2.length); + assertEquals(1, Bytes.toInt(results2[0].getRow())); + assertEquals(5, results2[0].rawCells().length); + for (int i = 5; i < 10; i++) { + assertEquals(1, Bytes.toInt(results2[0].getValue(CF, Bytes.toBytes("cq" + i)))); + } + + Result[] results3 = + resultCache.addAndGet(new Result[] { Result.create(cells1), Result.create(cells2) }, false); + assertEquals(1, results3.length); + assertEquals(2, Bytes.toInt(results3[0].getRow())); + assertEquals(10, results3[0].rawCells().length); + for (int i = 0; i < 10; i++) { + assertEquals(2, Bytes.toInt(results3[0].getValue(CF, Bytes.toBytes("cq" + i)))); + } + } +} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBatchScanResultCache.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBatchScanResultCache.java new file mode 100644 index 0000000..31a4594 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBatchScanResultCache.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; + +import java.io.IOException; +import java.util.Arrays; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ SmallTests.class, ClientTests.class }) +public class TestBatchScanResultCache { + + private static byte[] CF = Bytes.toBytes("cf"); + + private BatchScanResultCache resultCache; + + @Before + public void setUp() { + resultCache = new BatchScanResultCache(4); + } + + @After + public void tearDown() { + resultCache.clear(); + resultCache = null; + } + + static Cell createCell(byte[] cf, int key, int cq) { + return new KeyValue(Bytes.toBytes(key), cf, Bytes.toBytes("cq" + cq), Bytes.toBytes(key)); + } + + static Cell[] createCells(byte[] cf, int key, int numCqs) { + Cell[] cells = new Cell[numCqs]; + for (int i = 0; i < numCqs; i++) { + cells[i] = createCell(cf, key, i); + } + return cells; + } + + private void assertResultEquals(Result result, int key, int start, int to) { + assertEquals(to - start, result.size()); + for (int i = start; i < to; i++) { + assertEquals(key, Bytes.toInt(result.getValue(CF, Bytes.toBytes("cq" + i)))); + } + assertEquals(to - start == 4, result.mayHaveMoreCellsInRow()); + } + + @Test + public void test() throws IOException { + assertSame(ScanResultCache.EMPTY_RESULT_ARRAY, + resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, false)); + assertSame(ScanResultCache.EMPTY_RESULT_ARRAY, + resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, true)); + + Cell[] cells1 = createCells(CF, 1, 10); + Cell[] cells2 = createCells(CF, 2, 10); + Cell[] cells3 = createCells(CF, 3, 10); + assertEquals(0, resultCache.addAndGet( + new Result[] { Result.create(Arrays.copyOf(cells1, 3), null, false, true) }, false).length); + Result[] results = resultCache.addAndGet( + new Result[] { Result.create(Arrays.copyOfRange(cells1, 3, 7), null, false, true), + Result.create(Arrays.copyOfRange(cells1, 7, 10), null, false, true) }, + false); + assertEquals(2, results.length); + assertResultEquals(results[0], 1, 0, 4); + assertResultEquals(results[1], 1, 4, 8); + results = resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, false); + assertEquals(1, results.length); + assertResultEquals(results[0], 1, 8, 10); + + results = resultCache.addAndGet( + new Result[] { Result.create(Arrays.copyOfRange(cells2, 0, 4), null, false, true), + Result.create(Arrays.copyOfRange(cells2, 4, 8), null, false, true), + Result.create(Arrays.copyOfRange(cells2, 8, 10), null, false, true), + Result.create(Arrays.copyOfRange(cells3, 0, 4), null, false, true), + Result.create(Arrays.copyOfRange(cells3, 4, 8), null, false, true), + Result.create(Arrays.copyOfRange(cells3, 8, 10), null, false, false) }, + false); + assertEquals(6, results.length); + assertResultEquals(results[0], 2, 0, 4); + assertResultEquals(results[1], 2, 4, 8); + assertResultEquals(results[2], 2, 8, 10); + assertResultEquals(results[3], 3, 0, 4); + assertResultEquals(results[4], 3, 4, 8); + assertResultEquals(results[5], 3, 8, 10); + } +} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestCompleteResultScanResultCache.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestCompleteResultScanResultCache.java new file mode 100644 index 0000000..8759593 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestCompleteResultScanResultCache.java @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; + +import java.io.IOException; +import java.util.Arrays; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ SmallTests.class, ClientTests.class }) +public class TestCompleteResultScanResultCache { + + private static byte[] CF = Bytes.toBytes("cf"); + + private static byte[] CQ1 = Bytes.toBytes("cq1"); + + private static byte[] CQ2 = Bytes.toBytes("cq2"); + + private static byte[] CQ3 = Bytes.toBytes("cq3"); + + private CompleteScanResultCache resultCache; + + @Before + public void setUp() { + resultCache = new CompleteScanResultCache(); + } + + @After + public void tearDown() { + resultCache.clear(); + resultCache = null; + } + + private static Cell createCell(int key, byte[] cq) { + return new KeyValue(Bytes.toBytes(key), CF, cq, Bytes.toBytes(key)); + } + + @Test + public void testNoPartial() throws IOException { + assertSame(ScanResultCache.EMPTY_RESULT_ARRAY, + resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, false)); + assertSame(ScanResultCache.EMPTY_RESULT_ARRAY, + resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, true)); + int count = 10; + Result[] results = new Result[count]; + for (int i = 0; i < count; i++) { + results[i] = Result.create(Arrays.asList(createCell(i, CQ1))); + } + assertSame(results, resultCache.addAndGet(results, false)); + } + + @Test + public void testCombine1() throws IOException { + Result previousResult = Result.create(Arrays.asList(createCell(0, CQ1)), null, false, true); + Result result1 = Result.create(Arrays.asList(createCell(1, CQ1)), null, false, true); + Result result2 = Result.create(Arrays.asList(createCell(1, CQ2)), null, false, true); + Result result3 = Result.create(Arrays.asList(createCell(1, CQ3)), null, false, true); + Result[] results = resultCache.addAndGet(new Result[] { previousResult, result1 }, false); + assertEquals(1, results.length); + assertSame(previousResult, results[0]); + + assertEquals(0, resultCache.addAndGet(new Result[] { result2 }, false).length); + assertEquals(0, resultCache.addAndGet(new Result[] { result3 }, false).length); + assertEquals(0, resultCache.addAndGet(new Result[0], true).length); + + results = resultCache.addAndGet(new Result[0], false); + assertEquals(1, results.length); + assertEquals(1, Bytes.toInt(results[0].getRow())); + assertEquals(3, results[0].rawCells().length); + assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ1))); + assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ2))); + assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ3))); + } + + @Test + public void testCombine2() throws IOException { + Result result1 = Result.create(Arrays.asList(createCell(1, CQ1)), null, false, true); + Result result2 = Result.create(Arrays.asList(createCell(1, CQ2)), null, false, true); + Result result3 = Result.create(Arrays.asList(createCell(1, CQ3)), null, false, true); + Result nextResult1 = Result.create(Arrays.asList(createCell(2, CQ1)), null, false, true); + Result nextToNextResult1 = Result.create(Arrays.asList(createCell(3, CQ2)), null, false, false); + + assertEquals(0, resultCache.addAndGet(new Result[] { result1 }, false).length); + assertEquals(0, resultCache.addAndGet(new Result[] { result2 }, false).length); + assertEquals(0, resultCache.addAndGet(new Result[] { result3 }, false).length); + + Result[] results = resultCache.addAndGet(new Result[] { nextResult1 }, false); + assertEquals(1, results.length); + assertEquals(1, Bytes.toInt(results[0].getRow())); + assertEquals(3, results[0].rawCells().length); + assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ1))); + assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ2))); + assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ3))); + + results = resultCache.addAndGet(new Result[] { nextToNextResult1 }, false); + assertEquals(2, results.length); + assertEquals(2, Bytes.toInt(results[0].getRow())); + assertEquals(1, results[0].rawCells().length); + assertEquals(2, Bytes.toInt(results[0].getValue(CF, CQ1))); + assertEquals(3, Bytes.toInt(results[1].getRow())); + assertEquals(1, results[1].rawCells().length); + assertEquals(3, Bytes.toInt(results[1].getValue(CF, CQ2))); + } + + @Test + public void testCombine3() throws IOException { + Result result1 = Result.create(Arrays.asList(createCell(1, CQ1)), null, false, true); + Result result2 = Result.create(Arrays.asList(createCell(1, CQ2)), null, false, true); + Result nextResult1 = Result.create(Arrays.asList(createCell(2, CQ1)), null, false, false); + Result nextToNextResult1 = Result.create(Arrays.asList(createCell(3, CQ1)), null, false, true); + + assertEquals(0, resultCache.addAndGet(new Result[] { result1 }, false).length); + assertEquals(0, resultCache.addAndGet(new Result[] { result2 }, false).length); + + Result[] results = + resultCache.addAndGet(new Result[] { nextResult1, nextToNextResult1 }, false); + assertEquals(2, results.length); + assertEquals(1, Bytes.toInt(results[0].getRow())); + assertEquals(2, results[0].rawCells().length); + assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ1))); + assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ2))); + assertEquals(2, Bytes.toInt(results[1].getRow())); + assertEquals(1, results[1].rawCells().length); + assertEquals(2, Bytes.toInt(results[1].getValue(CF, CQ1))); + + results = resultCache.addAndGet(new Result[0], false); + assertEquals(1, results.length); + assertEquals(3, Bytes.toInt(results[0].getRow())); + assertEquals(1, results[0].rawCells().length); + assertEquals(3, Bytes.toInt(results[0].getValue(CF, CQ1))); + } + + @Test + public void testCombine4() throws IOException { + Result result1 = Result.create(Arrays.asList(createCell(1, CQ1)), null, false, true); + Result result2 = Result.create(Arrays.asList(createCell(1, CQ2)), null, false, false); + Result nextResult1 = Result.create(Arrays.asList(createCell(2, CQ1)), null, false, true); + Result nextResult2 = Result.create(Arrays.asList(createCell(2, CQ2)), null, false, false); + + assertEquals(0, resultCache.addAndGet(new Result[] { result1 }, false).length); + + Result[] results = resultCache.addAndGet(new Result[] { result2, nextResult1 }, false); + assertEquals(1, results.length); + assertEquals(1, Bytes.toInt(results[0].getRow())); + assertEquals(2, results[0].rawCells().length); + assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ1))); + assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ2))); + + results = resultCache.addAndGet(new Result[] { nextResult2 }, false); + assertEquals(1, results.length); + assertEquals(2, Bytes.toInt(results[0].getRow())); + assertEquals(2, results[0].rawCells().length); + assertEquals(2, Bytes.toInt(results[0].getValue(CF, CQ1))); + assertEquals(2, Bytes.toInt(results[0].getValue(CF, CQ2))); + } +} -- 2.7.4