.../hbase/regionserver/FlushStoreScanner.java | 88 ++++++++++++++++++++++ .../hadoop/hbase/regionserver/StoreFlusher.java | 3 +- .../hadoop/hbase/regionserver/StoreScanner.java | 2 +- 3 files changed, 91 insertions(+), 2 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushStoreScanner.java new file mode 100644 index 0000000..01fc563 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushStoreScanner.java @@ -0,0 +1,88 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; + +/** + * Special extension of StoreScanner that does not have the overhead of checking if the matcher has + * reached the nextRow. It just works on the fact that we keep iterating the KVHeap till the block + * size is reached and if so we return those results or if already the end is reached we return + * those results + */ +@InterfaceAudience.Private +public class FlushStoreScanner extends StoreScanner { + + private long sizeLimit; + /** + * Used for flushes. + *

+ * Opens a scanner across all memstore segments in the snapshot. + * @param store who we scan + * @param scan the spec + * @param scanners ancillary scanners + * @param smallestReadPoint the readPoint that we should use for tracking versions + */ + public FlushStoreScanner(Store store, ScanInfo scanInfo, Scan scan, + List scanners, ScanType scanType, long smallestReadPoint, + long earliestPutTs) throws IOException { + super(store, scanInfo, scan, scanners, scanType, smallestReadPoint, earliestPutTs); + this.sizeLimit = this.store.getFamily().getBlocksize(); + } + + @Override + public boolean next(List outResult, ScannerContext scannerContext) throws IOException { + // if the heap was left null, then the scanners had previously run out anyways, close and + // return. + if (this.heap == null) { + // By this time partial close should happened because already heap is null + close(false);// Do all cleanup except heap.close() + return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); + } + + Cell cell = this.heap.peek(); + if (cell == null) { + close(false);// Do all cleanup except heap.close() + return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); + } + long bytesFlushed = 0; + do { + outResult.add(cell); + int len = KeyValueUtil.length(cell); + bytesFlushed += len; + // do next + this.heap.next(); + if (bytesFlushed >= this.sizeLimit) { + // return from the loop + return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); + } + } while ((cell = heap.peek()) != null); + // No more keys + close(false);// Do all cleanup except heap.close() + return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java index abfd3fc..8c44e0f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java @@ -88,7 +88,8 @@ abstract class StoreFlusher { if (scanner == null) { Scan scan = new Scan(); scan.setMaxVersions(store.getScanInfo().getMaxVersions()); - scanner = new StoreScanner(store, store.getScanInfo(), scan, + // create a store scanner that is specific for flushes + scanner = new FlushStoreScanner(store, store.getScanInfo(), scan, Collections.singletonList(snapshotScanner), ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, HConstants.OLDEST_TIMESTAMP); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 7e08eca..f7e47b8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -458,7 +458,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner close(true); } - private void close(boolean withHeapClose) { + protected void close(boolean withHeapClose) { if (this.closing) { return; }