.../regionserver/MemStoreCompactorIterator.java | 3 +- .../hadoop/hbase/regionserver/MemStoreScanner.java | 253 +++++++++------------ .../hadoop/hbase/regionserver/SegmentScanner.java | 4 +- .../hadoop/hbase/client/TestFromClientSide.java | 1 - .../TestCompactingToCellArrayMapMemStore.java | 26 +-- .../hadoop/hbase/regionserver/TestHRegion.java | 2 - 6 files changed, 115 insertions(+), 174 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorIterator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorIterator.java index 2eafb42..9798ec2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorIterator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorIterator.java @@ -65,7 +65,7 @@ public class MemStoreCompactorIterator implements Iterator { scanners.add(segment.getScanner(store.getSmallestReadPoint())); } - scanner = new MemStoreScanner(comparator, scanners, MemStoreScanner.Type.COMPACT_FORWARD); + scanner = new MemStoreScanner(comparator, scanners, true); // reinitialize the compacting scanner for each instance of iterator compactingScanner = createScanner(store, scanner); @@ -101,7 +101,6 @@ public class MemStoreCompactorIterator implements Iterator { public void close() { compactingScanner.close(); compactingScanner = null; - scanner.close(); scanner = null; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java index 74d061c..e180aeb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java @@ -24,6 +24,8 @@ import java.util.List; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Scan; @@ -37,24 +39,12 @@ import org.apache.htrace.Trace; */ @InterfaceAudience.Private public class MemStoreScanner extends NonLazyKeyValueScanner { - /** - * Types of cell MemStoreScanner - */ - static public enum Type { - UNDEFINED, - COMPACT_FORWARD, - USER_SCAN_FORWARD, - USER_SCAN_BACKWARD - } - // heap of scanners used for traversing forward - private KeyValueHeap forwardHeap; - // reversed scanners heap for traversing backward - private ReversedKeyValueHeap backwardHeap; + // heap of scanners, lazily initialized + private KeyValueHeap heap; - // The type of the scan is defined by constructor - // or according to the first usage - private Type type = Type.UNDEFINED; + // indicates if the scanner is created for inmemoryCompaction + private boolean inmemoryCompaction; // remember the initial version of the scanners list List scanners; @@ -62,42 +52,74 @@ public class MemStoreScanner extends NonLazyKeyValueScanner { private final CellComparator comparator; /** - * If UNDEFINED type for MemStoreScanner is provided, the forward heap is used as default! - * After constructor only one heap is going to be initialized for entire lifespan - * of the MemStoreScanner. A specific scanner can only be one directional! - * + * Creates either a forward KeyValue heap or Reverse KeyValue heap based on the type of scan + * and the heap is lazily initialized * @param comparator Cell Comparator - * @param scanners List of scanners, from which the heap will be built - * @param type The scan type COMPACT_FORWARD should be used for compaction + * @param scanners List of scanners, from which the heap will be built + * @param inmemoryCompaction true if used for inmemoryCompaction */ - public MemStoreScanner(CellComparator comparator, List scanners, Type type) - throws IOException { + public MemStoreScanner(CellComparator comparator, List scanners, + boolean inmemoryCompaction) throws IOException { super(); - this.type = type; - switch (type) { - case UNDEFINED: - case USER_SCAN_FORWARD: - case COMPACT_FORWARD: - this.forwardHeap = new KeyValueHeap(scanners, comparator); - break; - case USER_SCAN_BACKWARD: - this.backwardHeap = new ReversedKeyValueHeap(scanners, comparator); - break; - default: - throw new IllegalArgumentException("Unknown scanner type in MemStoreScanner"); - } this.comparator = comparator; this.scanners = scanners; if (Trace.isTracing() && Trace.currentSpan() != null) { Trace.currentSpan().addTimelineAnnotation("Creating MemStoreScanner"); } + this.inmemoryCompaction = inmemoryCompaction; + if (inmemoryCompaction) { + // init the forward scanner in case of inmemoryCompaction + initForwardKVHeapIfNeeded(comparator, scanners); + } } - /* Constructor used only when the scan usage is unknown - and need to be defined according to the first move */ + /** + * Creates either a forward KeyValue heap or Reverse KeyValue heap based on the type of scan + * and the heap is lazily initialized + * @param comparator Cell Comparator + * @param scanners List of scanners, from which the heap will be built + */ public MemStoreScanner(CellComparator comparator, List scanners) throws IOException { - this(comparator, scanners, Type.UNDEFINED); + this(comparator, scanners, false); + } + + private void initForwardKVHeapIfNeeded(CellComparator comparator, List scanners) + throws IOException { + if (heap == null) { + // lazy init + // In a normal scan case, at the StoreScanner level before the KVHeap is + // created we do a seek or reseek. So that will happen + // on all the scanners that the StoreScanner is + // made of. So when we get any of those call to this scanner we init the + // heap here with normal forward KVHeap. + this.heap = new KeyValueHeap(scanners, comparator); + } + } + + private boolean initReverseKVHeapIfNeeded(Cell seekKey, CellComparator comparator, + List scanners) throws IOException { + boolean res = false; + if (heap == null) { + // lazy init + // In a normal reverse scan case, at the ReversedStoreScanner level before the + // ReverseKeyValueheap is + // created we do a seekToLastRow or backwardSeek. So that will happen + // on all the scanners that the ReversedStoreSCanner is + // made of. So when we get any of those call to this scanner we init the + // heap here with ReversedKVHeap. + if (CellUtil.matchingRow(seekKey, HConstants.EMPTY_START_ROW)) { + for (KeyValueScanner scanner : scanners) { + res |= scanner.seekToLastRow(); + } + } else { + for (KeyValueScanner scanner : scanners) { + res |= scanner.backwardSeek(seekKey); + } + } + this.heap = new ReversedKeyValueHeap(scanners, comparator); + } + return res; } /** @@ -105,30 +127,29 @@ public class MemStoreScanner extends NonLazyKeyValueScanner { * The backward traversal is assumed, only if specified explicitly */ @Override - public synchronized Cell peek() { - if (type == Type.USER_SCAN_BACKWARD) { - return backwardHeap.peek(); + public Cell peek() { + if (this.heap != null) { + return this.heap.peek(); } - return forwardHeap.peek(); + // Doing this way in case some test cases tries to peek directly to avoid NPE + return null; } /** * Gets the next cell from the top-most scanner. Assumed forward scanning. */ @Override - public synchronized Cell next() throws IOException { - KeyValueHeap heap = (Type.USER_SCAN_BACKWARD == type) ? backwardHeap : forwardHeap; - - // loop over till the next suitable value - // take next value from the heap - for (Cell currentCell = heap.next(); - currentCell != null; - currentCell = heap.next()) { - - // all the logic of presenting cells is inside the internal KeyValueScanners - // located inside the heap - - return currentCell; + public Cell next() throws IOException { + if(this.heap != null) { + // loop over till the next suitable value + // take next value from the heap + for (Cell currentCell = heap.next(); + currentCell != null; + currentCell = heap.next()) { + // all the logic of presenting cells is inside the internal KeyValueScanners + // located inside the heap + return currentCell; + } } return null; } @@ -142,15 +163,15 @@ public class MemStoreScanner extends NonLazyKeyValueScanner { * @return false if the key is null or if there is no data */ @Override - public synchronized boolean seek(Cell cell) throws IOException { - assertForward(); + public boolean seek(Cell cell) throws IOException { + initForwardKVHeapIfNeeded(comparator, scanners); if (cell == null) { close(); return false; } - return forwardHeap.seek(cell); + return heap.seek(cell); } /** @@ -160,7 +181,7 @@ public class MemStoreScanner extends NonLazyKeyValueScanner { * @return true if there is at least one KV to read, false otherwise */ @Override - public synchronized boolean reseek(Cell cell) throws IOException { + public boolean reseek(Cell cell) throws IOException { /* * See HBASE-4195 & HBASE-3855 & HBASE-6591 for the background on this implementation. * This code is executed concurrently with flush and puts, without locks. @@ -175,8 +196,8 @@ public class MemStoreScanner extends NonLazyKeyValueScanner { * * TODO: The above comment copied from the original MemStoreScanner */ - assertForward(); - return forwardHeap.reseek(cell); + initForwardKVHeapIfNeeded(comparator, scanners); + return heap.reseek(cell); } /** @@ -190,21 +211,16 @@ public class MemStoreScanner extends NonLazyKeyValueScanner { } @Override - public synchronized void close() { - - if (forwardHeap != null) { - assert ((type == Type.USER_SCAN_FORWARD) || - (type == Type.COMPACT_FORWARD) || (type == Type.UNDEFINED)); - forwardHeap.close(); - forwardHeap = null; - if (backwardHeap != null) { - backwardHeap.close(); - backwardHeap = null; + public void close() { + // Ensuring that all the segment scanners are closed + if (heap != null) { + heap.close(); + // It is safe to do close as no new calls will be made to this scanner. + heap = null; + } else { + for (KeyValueScanner scanner : scanners) { + scanner.close(); } - } else if (backwardHeap != null) { - assert (type == Type.USER_SCAN_BACKWARD); - backwardHeap.close(); - backwardHeap = null; } } @@ -215,9 +231,11 @@ public class MemStoreScanner extends NonLazyKeyValueScanner { * @return false if the key is null or if there is no data */ @Override - public synchronized boolean backwardSeek(Cell cell) throws IOException { - initBackwardHeapIfNeeded(cell, false); - return backwardHeap.backwardSeek(cell); + public boolean backwardSeek(Cell cell) throws IOException { + // The first time when this happens it sets the scanners to the seek key + // passed by the incoming scan's start row + initReverseKVHeapIfNeeded(cell, comparator, scanners); + return heap.backwardSeek(cell); } /** @@ -227,22 +245,17 @@ public class MemStoreScanner extends NonLazyKeyValueScanner { * @return false if the key is null or if there is no data */ @Override - public synchronized boolean seekToPreviousRow(Cell cell) throws IOException { - initBackwardHeapIfNeeded(cell, false); - if (backwardHeap.peek() == null) { + public boolean seekToPreviousRow(Cell cell) throws IOException { + initReverseKVHeapIfNeeded(cell, comparator, scanners); + if (heap.peek() == null) { restartBackwardHeap(cell); } - return backwardHeap.seekToPreviousRow(cell); + return heap.seekToPreviousRow(cell); } @Override - public synchronized boolean seekToLastRow() throws IOException { - // TODO: it looks like this is how it should be, however ReversedKeyValueHeap class doesn't - // implement seekToLastRow() method :( - // however seekToLastRow() was implemented in internal MemStoreScanner - // so I wonder whether we need to come with our own workaround, or to update - // ReversedKeyValueHeap - return initBackwardHeapIfNeeded(KeyValue.LOWESTKEY, true); + public boolean seekToLastRow() throws IOException { + return initReverseKVHeapIfNeeded(KeyValue.LOWESTKEY, comparator, scanners); } /** @@ -250,9 +263,9 @@ public class MemStoreScanner extends NonLazyKeyValueScanner { * @return False if the key definitely does not exist in this Memstore */ @Override - public synchronized boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) { - - if (type == Type.COMPACT_FORWARD) { + public boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) { + // TODO : Check if this can be removed. + if (inmemoryCompaction) { return true; } @@ -286,58 +299,8 @@ public class MemStoreScanner extends NonLazyKeyValueScanner { for (KeyValueScanner scan : scanners) { res |= scan.seekToPreviousRow(cell); } - this.backwardHeap = + this.heap = new ReversedKeyValueHeap(scanners, comparator); return res; } - - /** - * Checks whether the type of the scan suits the assumption of moving backward - */ - private boolean initBackwardHeapIfNeeded(Cell cell, boolean toLast) throws IOException { - boolean res = false; - if (toLast && (type != Type.UNDEFINED)) { - throw new IllegalStateException( - "Wrong usage of initBackwardHeapIfNeeded in parameters. The type is:" + type.toString()); - } - if (type == Type.UNDEFINED) { - // In case we started from peek, release the forward heap - // and build backward. Set the correct type. Thus this turn - // can happen only once - if ((backwardHeap == null) && (forwardHeap != null)) { - forwardHeap.close(); - forwardHeap = null; - // before building the heap seek for the relevant key on the scanners, - // for the heap to be built from the scanners correctly - for (KeyValueScanner scan : scanners) { - if (toLast) { - res |= scan.seekToLastRow(); - } else { - res |= scan.backwardSeek(cell); - } - } - this.backwardHeap = - new ReversedKeyValueHeap(scanners, comparator); - type = Type.USER_SCAN_BACKWARD; - } - } - - if (type == Type.USER_SCAN_FORWARD) { - throw new IllegalStateException("Traversing backward with forward scan"); - } - return res; - } - - /** - * Checks whether the type of the scan suits the assumption of moving forward - */ - private void assertForward() throws IllegalStateException { - if (type == Type.UNDEFINED) { - type = Type.USER_SCAN_FORWARD; - } - - if (type == Type.USER_SCAN_BACKWARD) { - throw new IllegalStateException("Traversing forward with backward scan"); - } - } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java index 8cf0a7c..92c3443 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java @@ -67,11 +67,11 @@ public class SegmentScanner implements KeyValueScanner { protected SegmentScanner(Segment segment, long readPoint, long scannerOrder) { this.segment = segment; this.readPoint = readPoint; + //increase the reference count so the underlying structure will not be de-allocated + this.segment.incScannerCount(); iter = segment.iterator(); // the initialization of the current is required for working with heap of SegmentScanners current = getNext(); - //increase the reference count so the underlying structure will not be de-allocated - this.segment.incScannerCount(); this.scannerOrder = scannerOrder; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index 33a5315..f10cce3a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -90,7 +90,6 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType; import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java index fefe2c1..f89a040 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java @@ -22,12 +22,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.*; -import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.EnvironmentEdge; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Threads; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -36,9 +33,6 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import java.io.IOException; -import java.lang.management.ManagementFactory; -import java.lang.management.MemoryMXBean; -import java.util.ArrayList; import java.util.List; @@ -281,13 +275,14 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore Threads.sleep(10); } List scanners = memstore.getScanners(Long.MAX_VALUE); - MemStoreScanner scanner = new MemStoreScanner(CellComparator.COMPARATOR, scanners); + // seek + scanners.get(0).seek(KeyValue.LOWESTKEY); int count = 0; - while (scanner.next() != null) { + while (scanners.get(0).next() != null) { count++; } assertEquals("the count should be ", count, 150); - scanner.close(); + scanners.get(0).close(); } @Test @@ -345,17 +340,4 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore } regionServicesForStores.addAndGetGlobalMemstoreSize(hmc.getActive().size() - size);// } - - private class EnvironmentEdgeForMemstoreTest implements EnvironmentEdge { - long t = 1234; - - @Override public long currentTime() { - return t; - } - - public void setCurrentTimeMillis(long t) { - this.t = t; - } - } - } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 2042f52..80d220d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -90,7 +90,6 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.client.TestMobSnapshotCloneIndependence; import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.ColumnCountGetFilter; @@ -103,7 +102,6 @@ import org.apache.hadoop.hbase.filter.PrefixFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.io.hfile.HFile; -import org.apache.hadoop.hbase.master.procedure.TestMasterFailoverWithProcedures; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor;