.../regionserver/CompositeImmutableSegment.java | 21 +--- .../hbase/regionserver/ImmutableSegment.java | 8 +- .../hbase/regionserver/MemStoreSnapshot.java | 2 +- .../hadoop/hbase/regionserver/SegmentScanner.java | 35 ++++--- .../hadoop/hbase/regionserver/SnapshotScanner.java | 107 +++++++++++++++++++++ .../hbase/regionserver/TestCompactingMemStore.java | 10 ++ .../TestCompactingToCellArrayMapMemStore.java | 102 ++++++++++++++++++++ .../hbase/regionserver/TestMemStoreChunkPool.java | 6 ++ 8 files changed, 254 insertions(+), 37 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java index 4fdd2d0..e90b7ad 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java @@ -22,7 +22,6 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; -import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Scan; @@ -50,12 +49,6 @@ public class CompositeImmutableSegment extends ImmutableSegment { private final TimeRangeTracker timeRangeTracker; private long keySize = 0; - // This scanner need to be remembered in order to close it when the snapshot is cleared. - // Initially CollectionBackedScanner didn't raise the scanner counters thus there was no - // need to close it. Now when MemStoreScanner is used instead we need to decrease the - // scanner counters. - private KeyValueScanner flushingScanner = null; - public CompositeImmutableSegment(CellComparator comparator, List segments) { super(comparator); this.comparator = comparator; @@ -78,24 +71,22 @@ public class CompositeImmutableSegment extends ImmutableSegment { } /** - * Builds a special scanner for the MemStoreSnapshot object that is different than the - * general segment scanner. + * Builds a special scanner for the MemStoreSnapshot object that is different than the general + * segment scanner. * @return a special scanner for the MemStoreSnapshot object */ - public KeyValueScanner getKeyValueScanner() { + @Override + public KeyValueScanner getSnapshotScanner() { KeyValueScanner scanner; List list = new ArrayList(segments.size()); for (ImmutableSegment s : segments) { list.add(s.getScanner(Long.MAX_VALUE)); } - try { scanner = new MemStoreScanner(getComparator(), list); } catch (IOException ie) { throw new IllegalStateException(ie); } - - flushingScanner = scanner; return scanner; } @@ -144,10 +135,6 @@ public class CompositeImmutableSegment extends ImmutableSegment { * Closing a segment before it is being discarded */ public void close() { - if (flushingScanner != null) { - flushingScanner.close(); - flushingScanner = null; - } for (ImmutableSegment s : segments) { s.close(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java index 547d332..f04886e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java @@ -27,18 +27,16 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.TimeRange; -import org.apache.hadoop.hbase.util.CollectionBackedScanner; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.LinkedList; import java.util.List; /** * ImmutableSegment is an abstract class that extends the API supported by a {@link Segment}, * and is not needed for a {@link MutableSegment}. Specifically, the method - * {@link ImmutableSegment#getKeyValueScanner()} builds a special scanner for the + * {@link ImmutableSegment#getSnapshotScanner()} builds a special scanner for the * {@link MemStoreSnapshot} object. */ @InterfaceAudience.Private @@ -139,8 +137,8 @@ public class ImmutableSegment extends Segment { * general segment scanner. * @return a special scanner for the MemStoreSnapshot object */ - public KeyValueScanner getKeyValueScanner() { - return new CollectionBackedScanner(getCellSet(), getComparator()); + public KeyValueScanner getSnapshotScanner() { + return new SnapshotScanner(this); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java index 74d1e17..61e7876 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java @@ -40,7 +40,7 @@ public class MemStoreSnapshot { this.dataSize = snapshot.keySize(); this.heapOverhead = snapshot.heapOverhead(); this.timeRangeTracker = snapshot.getTimeRangeTracker(); - this.scanner = snapshot.getKeyValueScanner(); + this.scanner = snapshot.getSnapshotScanner(); this.tagsPresent = snapshot.isTagsPresent(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java index 7803f7d..a60f047 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java @@ -41,14 +41,14 @@ public class SegmentScanner implements KeyValueScanner { private static final long DEFAULT_SCANNER_ORDER = Long.MAX_VALUE; // the observed structure - private final Segment segment; + protected final Segment segment; // the highest relevant MVCC private long readPoint; // the current iterator that can be reinitialized by // seek(), backwardSeek(), or reseek() - private Iterator iter; + protected Iterator iter; // the pre-calculated cell to be returned by peek() - private Cell current = null; + protected Cell current = null; // or next() // A flag represents whether could stop skipping KeyValues for MVCC // if have encountered the next row. Only used for reversed scan @@ -57,7 +57,7 @@ public class SegmentScanner implements KeyValueScanner { private Cell last = null; // flag to indicate if this scanner is closed - private boolean closed = false; + protected boolean closed = false; protected SegmentScanner(Segment segment, long readPoint) { this(segment, readPoint, DEFAULT_SCANNER_ORDER); @@ -74,7 +74,7 @@ public class SegmentScanner implements KeyValueScanner { this.segment.incScannerCount(); iter = segment.iterator(); // the initialization of the current is required for working with heap of SegmentScanners - current = getNext(); + getNext(); this.scannerOrder = scannerOrder; if (current == null) { // nothing to fetch from this scanner @@ -108,7 +108,7 @@ public class SegmentScanner implements KeyValueScanner { return null; } Cell oldCurrent = current; - current = getNext(); // update the currently observed Cell + getNext(); // update the currently observed Cell return oldCurrent; } @@ -127,13 +127,17 @@ public class SegmentScanner implements KeyValueScanner { return false; } // restart the iterator from new key - iter = segment.tailSet(cell).iterator(); + iter = getIterator(cell); // last is going to be reinitialized in the next getNext() call last = null; - current = getNext(); + getNext(); return (current != null); } + protected Iterator getIterator(Cell cell) { + return segment.tailSet(cell).iterator(); + } + /** * Reseek the scanner at or after the specified KeyValue. * This method is guaranteed to seek at or after the required key only if the @@ -156,8 +160,8 @@ public class SegmentScanner implements KeyValueScanner { get it. So we remember the last keys we iterated to and restore the reseeked set to at least that point. */ - iter = segment.tailSet(getHighest(cell, last)).iterator(); - current = getNext(); + iter = getIterator(getHighest(cell, last)); + getNext(); return (current != null); } @@ -355,7 +359,7 @@ public class SegmentScanner implements KeyValueScanner { * Private internal method for iterating over the segment, * skipping the cells with irrelevant MVCC */ - private Cell getNext() { + protected Cell getNext() { Cell startKV = current; Cell next = null; @@ -363,22 +367,25 @@ public class SegmentScanner implements KeyValueScanner { while (iter.hasNext()) { next = iter.next(); if (next.getSequenceId() <= this.readPoint) { - return next; // skip irrelevant versions + current = next; + return current;// skip irrelevant versions } if (stopSkippingKVsIfNextRow && // for backwardSeek() stay in the startKV != null && // boundaries of a single row segment.compareRows(next, startKV) > 0) { - return null; + current = null; + return current; } } // end of while - return null; // nothing found + current = null; // nothing found } finally { if (next != null) { // in all cases, remember the last KV we iterated to, needed for reseek() last = next; } } + return current; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotScanner.java new file mode 100644 index 0000000..d92acf4 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotScanner.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.commons.lang.NotImplementedException; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Scan; + +/** + * Scans the snapshot. Acts as a simple scanner that just iterates over all the cells + * in the segment + */ +@InterfaceAudience.Private +public class SnapshotScanner extends SegmentScanner { + + public SnapshotScanner(Segment immutableSegment) { + // Snapshot scanner does not need readpoint. It should read all the cells in the + // segment + super(immutableSegment, Long.MAX_VALUE); + } + + @Override + public Cell peek() { // sanity check, the current should be always valid + if (closed) { + return null; + } + return current; + } + + @Override + public boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) { + return true; + } + + @Override + public boolean backwardSeek(Cell key) throws IOException { + throw new NotImplementedException( + "backwardSeek must not be called on a " + "non-reversed scanner"); + } + + @Override + public boolean seekToPreviousRow(Cell key) throws IOException { + throw new NotImplementedException( + "seekToPreviousRow must not be called on a " + "non-reversed scanner"); + } + + @Override + public boolean seekToLastRow() throws IOException { + throw new NotImplementedException( + "seekToLastRow must not be called on a " + "non-reversed scanner"); + } + + @Override + protected Iterator getIterator(Cell cell) { + return segment.iterator(); + } + + @Override + protected Cell getNext() { + Cell oldCurrent = current; + if (iter.hasNext()) { + current = iter.next(); + } else { + current = null; + } + return oldCurrent; + } + + @Override + public boolean seek(Cell seekCell) { + // restart iterator + iter = getIterator(seekCell); + return reseek(seekCell); + } + + @Override + public boolean reseek(Cell seekCell) { + while (iter.hasNext()) { + Cell next = iter.next(); + int ret = segment.getComparator().compare(next, seekCell); + if (ret >= 0) { + current = next; + return true; + } + } + return false; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java index 0c1880c..58a017f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java @@ -399,6 +399,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore { memstore.add(new KeyValue(row, fam, qf4, val), null); memstore.add(new KeyValue(row, fam, qf5, val), null); assertEquals(2, memstore.getActive().getCellsCount()); + // close the scanner + snapshot.getScanner().close(); memstore.clearSnapshot(snapshot.getId()); int chunkCount = chunkPool.getPoolSize(); @@ -439,6 +441,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore { List scanners = memstore.getScanners(0); // Shouldn't putting back the chunks to pool,since some scanners are opening // based on their data + // close the scanner + snapshot.getScanner().close(); memstore.clearSnapshot(snapshot.getId()); assertTrue(chunkPool.getPoolSize() == 0); @@ -466,6 +470,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore { } // Since no opening scanner, the chunks of snapshot should be put back to // pool + // close the scanner + snapshot.getScanner().close(); memstore.clearSnapshot(snapshot.getId()); assertTrue(chunkPool.getPoolSize() > 0); } @@ -533,6 +539,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore { // Creating another snapshot MemStoreSnapshot snapshot = memstore.snapshot(); + // close the scanner + snapshot.getScanner().close(); memstore.clearSnapshot(snapshot.getId()); snapshot = memstore.snapshot(); @@ -547,6 +555,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore { } // Since no opening scanner, the chunks of snapshot should be put back to // pool + // close the scanner + snapshot.getScanner().close(); memstore.clearSnapshot(snapshot.getId()); assertTrue(chunkPool.getPoolSize() > 0); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java index 56ae72e..0203758 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java @@ -387,6 +387,108 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore } } + @Override + @Test + public void testPuttingBackChunksWithOpeningScanner() throws IOException { + byte[] row = Bytes.toBytes("testrow"); + byte[] fam = Bytes.toBytes("testfamily"); + byte[] qf1 = Bytes.toBytes("testqualifier1"); + byte[] qf2 = Bytes.toBytes("testqualifier2"); + byte[] qf3 = Bytes.toBytes("testqualifier3"); + byte[] qf4 = Bytes.toBytes("testqualifier4"); + byte[] qf5 = Bytes.toBytes("testqualifier5"); + byte[] qf6 = Bytes.toBytes("testqualifier6"); + byte[] qf7 = Bytes.toBytes("testqualifier7"); + byte[] val = Bytes.toBytes("testval"); + + // Setting up memstore + memstore.add(new KeyValue(row, fam, qf1, val), null); + memstore.add(new KeyValue(row, fam, qf2, val), null); + memstore.add(new KeyValue(row, fam, qf3, val), null); + + // Creating a snapshot + MemStoreSnapshot snapshot = memstore.snapshot(); + assertEquals(3, memstore.getSnapshot().getCellsCount()); + + // Adding value to "new" memstore + assertEquals(0, memstore.getActive().getCellsCount()); + memstore.add(new KeyValue(row, fam, qf4, val), null); + memstore.add(new KeyValue(row, fam, qf5, val), null); + assertEquals(2, memstore.getActive().getCellsCount()); + + // opening scanner before clear the snapshot + List scanners = memstore.getScanners(0); + // Shouldn't putting back the chunks to pool,since some scanners are opening + // based on their data + // close the scanner + snapshot.getScanner().close(); + memstore.clearSnapshot(snapshot.getId()); + + assertTrue(chunkPool.getPoolSize() == 0); + + // Chunks will be put back to pool after close scanners; + for (KeyValueScanner scanner : scanners) { + scanner.close(); + } + assertTrue(chunkPool.getPoolSize() > 0); + + // clear chunks + chunkPool.clearChunks(); + + // Creating another snapshot + + snapshot = memstore.snapshot(); + // Adding more value + memstore.add(new KeyValue(row, fam, qf6, val), null); + memstore.add(new KeyValue(row, fam, qf7, val), null); + // opening scanners + scanners = memstore.getScanners(0); + // close scanners before clear the snapshot + for (KeyValueScanner scanner : scanners) { + scanner.close(); + } + // Since no opening scanner, the chunks of snapshot should be put back to + // pool + // close the scanner + snapshot.getScanner().close(); + memstore.clearSnapshot(snapshot.getId()); + assertTrue(chunkPool.getPoolSize() > 0); + } + + @Test + public void testPuttingBackChunksAfterFlushing() throws IOException { + byte[] row = Bytes.toBytes("testrow"); + byte[] fam = Bytes.toBytes("testfamily"); + byte[] qf1 = Bytes.toBytes("testqualifier1"); + byte[] qf2 = Bytes.toBytes("testqualifier2"); + byte[] qf3 = Bytes.toBytes("testqualifier3"); + byte[] qf4 = Bytes.toBytes("testqualifier4"); + byte[] qf5 = Bytes.toBytes("testqualifier5"); + byte[] val = Bytes.toBytes("testval"); + + // Setting up memstore + memstore.add(new KeyValue(row, fam, qf1, val), null); + memstore.add(new KeyValue(row, fam, qf2, val), null); + memstore.add(new KeyValue(row, fam, qf3, val), null); + + // Creating a snapshot + MemStoreSnapshot snapshot = memstore.snapshot(); + assertEquals(3, memstore.getSnapshot().getCellsCount()); + + // Adding value to "new" memstore + assertEquals(0, memstore.getActive().getCellsCount()); + memstore.add(new KeyValue(row, fam, qf4, val), null); + memstore.add(new KeyValue(row, fam, qf5, val), null); + assertEquals(2, memstore.getActive().getCellsCount()); + // close the scanner + snapshot.getScanner().close(); + memstore.clearSnapshot(snapshot.getId()); + + int chunkCount = chunkPool.getPoolSize(); + assertTrue(chunkCount > 0); + } + + private long addRowsByKeys(final AbstractMemStore hmc, String[] keys) { byte[] fam = Bytes.toBytes("testfamily"); byte[] qf = Bytes.toBytes("testqualifier"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java index e2ba169..49b5139 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java @@ -135,6 +135,8 @@ public class TestMemStoreChunkPool { memstore.add(new KeyValue(row, fam, qf4, val), null); memstore.add(new KeyValue(row, fam, qf5, val), null); assertEquals(2, memstore.getActive().getCellsCount()); + // close the scanner - this is how the snapshot will be used + snapshot.getScanner().close(); memstore.clearSnapshot(snapshot.getId()); int chunkCount = chunkPool.getPoolSize(); @@ -177,6 +179,8 @@ public class TestMemStoreChunkPool { List scanners = memstore.getScanners(0); // Shouldn't putting back the chunks to pool,since some scanners are opening // based on their data + // close the snapshot scanner + snapshot.getScanner().close(); memstore.clearSnapshot(snapshot.getId()); assertTrue(chunkPool.getPoolSize() == 0); @@ -203,6 +207,8 @@ public class TestMemStoreChunkPool { } // Since no opening scanner, the chunks of snapshot should be put back to // pool + // close the snapshot scanner + snapshot.getScanner().close(); memstore.clearSnapshot(snapshot.getId()); assertTrue(chunkPool.getPoolSize() > 0); }