.../hbase/regionserver/ImmutableSegment.java | 6 +- .../hbase/regionserver/MemStoreSnapshot.java | 2 +- .../hadoop/hbase/regionserver/SegmentScanner.java | 35 ++++--- .../hadoop/hbase/regionserver/SnapshotScanner.java | 107 +++++++++++++++++++++ .../hbase/regionserver/TestMemStoreChunkPool.java | 6 ++ 5 files changed, 138 insertions(+), 18 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java index 4cdb29d..bf97a47 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java @@ -34,7 +34,7 @@ import java.io.IOException; /** * ImmutableSegment is an abstract class that extends the API supported by a {@link Segment}, * and is not needed for a {@link MutableSegment}. Specifically, the method - * {@link ImmutableSegment#getKeyValueScanner()} builds a special scanner for the + * {@link ImmutableSegment#getSnapshotScanner()} builds a special scanner for the * {@link MemStoreSnapshot} object. */ @InterfaceAudience.Private @@ -127,8 +127,8 @@ public class ImmutableSegment extends Segment { * general segment scanner. * @return a special scanner for the MemStoreSnapshot object */ - public KeyValueScanner getKeyValueScanner() { - return new CollectionBackedScanner(getCellSet(), getComparator()); + public KeyValueScanner getSnapshotScanner() { + return new SnapshotScanner(this); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java index 74d1e17..61e7876 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java @@ -40,7 +40,7 @@ public class MemStoreSnapshot { this.dataSize = snapshot.keySize(); this.heapOverhead = snapshot.heapOverhead(); this.timeRangeTracker = snapshot.getTimeRangeTracker(); - this.scanner = snapshot.getKeyValueScanner(); + this.scanner = snapshot.getSnapshotScanner(); this.tagsPresent = snapshot.isTagsPresent(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java index 7803f7d..a60f047 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java @@ -41,14 +41,14 @@ public class SegmentScanner implements KeyValueScanner { private static final long DEFAULT_SCANNER_ORDER = Long.MAX_VALUE; // the observed structure - private final Segment segment; + protected final Segment segment; // the highest relevant MVCC private long readPoint; // the current iterator that can be reinitialized by // seek(), backwardSeek(), or reseek() - private Iterator iter; + protected Iterator iter; // the pre-calculated cell to be returned by peek() - private Cell current = null; + protected Cell current = null; // or next() // A flag represents whether could stop skipping KeyValues for MVCC // if have encountered the next row. Only used for reversed scan @@ -57,7 +57,7 @@ public class SegmentScanner implements KeyValueScanner { private Cell last = null; // flag to indicate if this scanner is closed - private boolean closed = false; + protected boolean closed = false; protected SegmentScanner(Segment segment, long readPoint) { this(segment, readPoint, DEFAULT_SCANNER_ORDER); @@ -74,7 +74,7 @@ public class SegmentScanner implements KeyValueScanner { this.segment.incScannerCount(); iter = segment.iterator(); // the initialization of the current is required for working with heap of SegmentScanners - current = getNext(); + getNext(); this.scannerOrder = scannerOrder; if (current == null) { // nothing to fetch from this scanner @@ -108,7 +108,7 @@ public class SegmentScanner implements KeyValueScanner { return null; } Cell oldCurrent = current; - current = getNext(); // update the currently observed Cell + getNext(); // update the currently observed Cell return oldCurrent; } @@ -127,13 +127,17 @@ public class SegmentScanner implements KeyValueScanner { return false; } // restart the iterator from new key - iter = segment.tailSet(cell).iterator(); + iter = getIterator(cell); // last is going to be reinitialized in the next getNext() call last = null; - current = getNext(); + getNext(); return (current != null); } + protected Iterator getIterator(Cell cell) { + return segment.tailSet(cell).iterator(); + } + /** * Reseek the scanner at or after the specified KeyValue. * This method is guaranteed to seek at or after the required key only if the @@ -156,8 +160,8 @@ public class SegmentScanner implements KeyValueScanner { get it. So we remember the last keys we iterated to and restore the reseeked set to at least that point. */ - iter = segment.tailSet(getHighest(cell, last)).iterator(); - current = getNext(); + iter = getIterator(getHighest(cell, last)); + getNext(); return (current != null); } @@ -355,7 +359,7 @@ public class SegmentScanner implements KeyValueScanner { * Private internal method for iterating over the segment, * skipping the cells with irrelevant MVCC */ - private Cell getNext() { + protected Cell getNext() { Cell startKV = current; Cell next = null; @@ -363,22 +367,25 @@ public class SegmentScanner implements KeyValueScanner { while (iter.hasNext()) { next = iter.next(); if (next.getSequenceId() <= this.readPoint) { - return next; // skip irrelevant versions + current = next; + return current;// skip irrelevant versions } if (stopSkippingKVsIfNextRow && // for backwardSeek() stay in the startKV != null && // boundaries of a single row segment.compareRows(next, startKV) > 0) { - return null; + current = null; + return current; } } // end of while - return null; // nothing found + current = null; // nothing found } finally { if (next != null) { // in all cases, remember the last KV we iterated to, needed for reseek() last = next; } } + return current; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotScanner.java new file mode 100644 index 0000000..d92acf4 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotScanner.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.commons.lang.NotImplementedException; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Scan; + +/** + * Scans the snapshot. Acts as a simple scanner that just iterates over all the cells + * in the segment + */ +@InterfaceAudience.Private +public class SnapshotScanner extends SegmentScanner { + + public SnapshotScanner(Segment immutableSegment) { + // Snapshot scanner does not need readpoint. It should read all the cells in the + // segment + super(immutableSegment, Long.MAX_VALUE); + } + + @Override + public Cell peek() { // sanity check, the current should be always valid + if (closed) { + return null; + } + return current; + } + + @Override + public boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) { + return true; + } + + @Override + public boolean backwardSeek(Cell key) throws IOException { + throw new NotImplementedException( + "backwardSeek must not be called on a " + "non-reversed scanner"); + } + + @Override + public boolean seekToPreviousRow(Cell key) throws IOException { + throw new NotImplementedException( + "seekToPreviousRow must not be called on a " + "non-reversed scanner"); + } + + @Override + public boolean seekToLastRow() throws IOException { + throw new NotImplementedException( + "seekToLastRow must not be called on a " + "non-reversed scanner"); + } + + @Override + protected Iterator getIterator(Cell cell) { + return segment.iterator(); + } + + @Override + protected Cell getNext() { + Cell oldCurrent = current; + if (iter.hasNext()) { + current = iter.next(); + } else { + current = null; + } + return oldCurrent; + } + + @Override + public boolean seek(Cell seekCell) { + // restart iterator + iter = getIterator(seekCell); + return reseek(seekCell); + } + + @Override + public boolean reseek(Cell seekCell) { + while (iter.hasNext()) { + Cell next = iter.next(); + int ret = segment.getComparator().compare(next, seekCell); + if (ret >= 0) { + current = next; + return true; + } + } + return false; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java index e2ba169..49b5139 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java @@ -135,6 +135,8 @@ public class TestMemStoreChunkPool { memstore.add(new KeyValue(row, fam, qf4, val), null); memstore.add(new KeyValue(row, fam, qf5, val), null); assertEquals(2, memstore.getActive().getCellsCount()); + // close the scanner - this is how the snapshot will be used + snapshot.getScanner().close(); memstore.clearSnapshot(snapshot.getId()); int chunkCount = chunkPool.getPoolSize(); @@ -177,6 +179,8 @@ public class TestMemStoreChunkPool { List scanners = memstore.getScanners(0); // Shouldn't putting back the chunks to pool,since some scanners are opening // based on their data + // close the snapshot scanner + snapshot.getScanner().close(); memstore.clearSnapshot(snapshot.getId()); assertTrue(chunkPool.getPoolSize() == 0); @@ -203,6 +207,8 @@ public class TestMemStoreChunkPool { } // Since no opening scanner, the chunks of snapshot should be put back to // pool + // close the snapshot scanner + snapshot.getScanner().close(); memstore.clearSnapshot(snapshot.getId()); assertTrue(chunkPool.getPoolSize() > 0); }