From 6a00af5cf5549e75f5567030d1326bea141cd2f6 Mon Sep 17 00:00:00 2001 From: eshcar Date: Thu, 26 Jan 2017 11:14:26 +0200 Subject: [PATCH] HBASE-17339: Scan-Memory-First Optimization for Get Operations --- .../java/org/apache/hadoop/hbase/client/Get.java | 50 +++++++++++ .../hbase/regionserver/AbstractMemStore.java | 49 +++++++---- .../hbase/regionserver/CompactingMemStore.java | 5 +- .../regionserver/CompositeImmutableSegment.java | 22 ++++- .../hadoop/hbase/regionserver/DefaultMemStore.java | 2 +- .../apache/hadoop/hbase/regionserver/HRegion.java | 37 ++++++-- .../apache/hadoop/hbase/regionserver/HStore.java | 13 ++- .../hbase/regionserver/ImmutableSegment.java | 5 ++ .../apache/hadoop/hbase/regionserver/MemStore.java | 8 ++ .../hadoop/hbase/regionserver/MutableSegment.java | 4 + .../hadoop/hbase/regionserver/RSRpcServices.java | 98 +++++++++++++++++----- .../hadoop/hbase/regionserver/RegionScanner.java | 8 ++ .../apache/hadoop/hbase/regionserver/Segment.java | 1 + .../apache/hadoop/hbase/regionserver/Store.java | 8 ++ .../hadoop/hbase/regionserver/StoreScanner.java | 4 + 15 files changed, 263 insertions(+), 51 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java index 947b54a..bb8f1ab 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java @@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.NavigableSet; @@ -31,6 +32,8 @@ import java.util.TreeSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; @@ -533,4 +536,51 @@ public class Get extends Query return (Get) super.setIsolationLevel(level); } + public boolean shouldApplyMemoryScanOptimization() { + // TODO add a flag which indicates if the user wants to apply the optimization + for(Set columns : familyMap.values()) { + if(columns == null) return false; // not explicit set of columns -- cannot apply optimization + } + return true; + } + + public boolean satisfiedWith(List results) { + if(!shouldApplyMemoryScanOptimization()) return false; + Bytes[] columns = getColumns(); + int[] counters = new int[columns.length]; + // count #cells per qualifier in the list of columns + for(Cell cell : results) { + int index = 0; + for(Bytes col : columns) { + if(CellComparator.compareQualifiers(cell, col.get(), col.getOffset(), col.getLength()) == + 0) { + counters[index]++; + } + index++; + } + } + // verify each qualifier has sufficient number of versions as defined by the get operation + for (int i = 0; i < counters.length; i++) { + if(counters[i] < maxVersions) { + return false; // not enough versions + } + } + return true; // the get operation is satisfied with the result + } + + // returns a set of all columns qualifiers asked by the get operation + private Bytes[] getColumns() { + Set allColumns = new HashSet<>(); + for(Set columns : familyMap.values()) { + if (columns != null) { + allColumns.addAll(columns); + } + } + Bytes[] res = new Bytes[allColumns.size()]; + int i=0; + for(byte[] col : allColumns) { + res[i++] = new Bytes(col); + } + return res; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java index 225dd73..413a0da 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java @@ -53,6 +53,8 @@ public abstract class AbstractMemStore implements MemStore { protected volatile long snapshotId; // Used to track when to flush private volatile long timeOfOldestEdit; + // used to check if all timestamps in memstore are strictly greater than timestamps in files + private volatile long maxFlushedTimestamp; public final static long FIXED_OVERHEAD = ClassSize .align(ClassSize.OBJECT + (4 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG)); @@ -65,6 +67,8 @@ public abstract class AbstractMemStore implements MemStore { resetActive(); this.snapshot = SegmentFactory.instance().createImmutableSegment(c); this.snapshotId = NO_SNAPSHOT_ID; + // TODO init this properly when recovering -- need to traverse all existing store files + this.maxFlushedTimestamp = 0; } protected void resetActive() { @@ -121,6 +125,24 @@ public abstract class AbstractMemStore implements MemStore { } /** + * A store preserves monotonicity if all timestamps in memstore are strictly greater than all + * timestamps in store files. + * @return maximal timestamp that was flushed to disk in this store or null if monotonicity is not + * preserved + */ + @Override + public Long getMaxFlushedTimestamp() { + List segments = getSegments(); + for(Segment segment : segments) { + if(segment.getMinTimestamp() < maxFlushedTimestamp) { + // timestamp overlap -- not monotonic + return null; + } + } + return maxFlushedTimestamp; + } + + /** * @return Oldest timestamp of all the Cells in the MemStore */ @Override @@ -135,18 +157,19 @@ public abstract class AbstractMemStore implements MemStore { */ @Override public void clearSnapshot(long id) throws UnexpectedStateException { - if (this.snapshotId == -1) return; // already cleared - if (this.snapshotId != id) { - throw new UnexpectedStateException("Current snapshot id is " + this.snapshotId + ",passed " - + id); + if (snapshotId == -1) return; // already cleared + if (snapshotId != id) { + throw new UnexpectedStateException("Current snapshot id is " + snapshotId + ",passed " + id); } // OK. Passed in snapshot is same as current snapshot. If not-empty, // create a new snapshot and let the old one go. Segment oldSnapshot = this.snapshot; - if (!this.snapshot.isEmpty()) { - this.snapshot = SegmentFactory.instance().createImmutableSegment(this.comparator); + if (!snapshot.isEmpty()) { + // The magic moment to maintain the maximal flushed ts property + maxFlushedTimestamp = Math.max(maxFlushedTimestamp, snapshot.getMaxTimestamp()); + snapshot = SegmentFactory.instance().createImmutableSegment(comparator); } - this.snapshotId = NO_SNAPSHOT_ID; + snapshotId = NO_SNAPSHOT_ID; oldSnapshot.close(); } @@ -159,13 +182,9 @@ public abstract class AbstractMemStore implements MemStore { public String toString() { StringBuffer buf = new StringBuffer(); int i = 1; - try { - for (Segment segment : getSegments()) { - buf.append("Segment (" + i + ") " + segment.toString() + "; "); - i++; - } - } catch (IOException e){ - return e.toString(); + for (Segment segment : getSegments()) { + buf.append("Segment (" + i + ") " + segment.toString() + "; "); + i++; } return buf.toString(); } @@ -307,6 +326,6 @@ public abstract class AbstractMemStore implements MemStore { /** * @return an ordered list of segments from most recent to oldest in memstore */ - protected abstract List getSegments() throws IOException; + protected abstract List getSegments(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index 48dc880..ca507a5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -295,8 +295,7 @@ public class CompactingMemStore extends AbstractMemStore { public List getScanners(long readPt) throws IOException { List pipelineList = pipeline.getSegments(); int order = pipelineList.size() + snapshot.getNumOfSegments(); - // The list of elements in pipeline + the active element + the snapshot segment - // TODO : This will change when the snapshot is made of more than one element + // The list of elements in pipeline + the active element + the snapshot segments // The order is the Segment ordinal List list = new ArrayList(order+1); list.add(this.active.getScanner(readPt, order + 1)); @@ -308,6 +307,8 @@ public class CompactingMemStore extends AbstractMemStore { list.add(item.getScanner(readPt, order)); order--; } + // TODO check if we can change the implementation to return multiple scanners + // so we can later filter out each one of them and not either keep all or eliminate all return Collections. singletonList(new MemStoreScanner(getComparator(), list)); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java index 30d17fb..2705f89 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java @@ -131,10 +131,6 @@ public class CompositeImmutableSegment extends ImmutableSegment { throw new IllegalStateException("Not supported by CompositeImmutableScanner"); } - public long getMinTimestamp(){ - throw new IllegalStateException("Not supported by CompositeImmutableScanner"); - } - /** * Creates the scanner for the given read point * @return a scanner for the given read point @@ -247,6 +243,24 @@ public class CompositeImmutableSegment extends ImmutableSegment { throw new IllegalStateException("Not supported by CompositeImmutableScanner"); } + @Override + public long getMaxTimestamp() { + long max = Long.MIN_VALUE; + for (Segment s : segments) { + max = Math.max(max, s.getMaxTimestamp()); + } + return max; + } + + @Override + public long getMinTimestamp() { + long min = Long.MAX_VALUE; + for (Segment s : segments) { + min = Math.min(min, s.getMinTimestamp()); + } + return min; + } + /** * @return a set of all cells in the segment */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java index 63af570..cba0991 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java @@ -134,7 +134,7 @@ public class DefaultMemStore extends AbstractMemStore { } @Override - protected List getSegments() throws IOException { + protected List getSegments() { List list = new ArrayList(2); list.add(this.active); list.add(this.snapshot); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index ef6239d..48f542d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -104,7 +104,6 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.TagUtil; import org.apache.hadoop.hbase.UnknownScannerException; -import org.apache.hadoop.hbase.backup.HFileArchiver; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Delete; @@ -2767,12 +2766,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } protected RegionScanner instantiateRegionScanner(Scan scan, - List additionalScanners) throws IOException { - return instantiateRegionScanner(scan, additionalScanners, HConstants.NO_NONCE, - HConstants.NO_NONCE); - } - - protected RegionScanner instantiateRegionScanner(Scan scan, List additionalScanners, long nonceGroup, long nonce) throws IOException { if (scan.isReversed()) { if (scan.getFilter() != null) { @@ -5780,13 +5773,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private final long maxResultSize; private final ScannerContext defaultScannerContext; private final FilterWrapper filter; + private Map storesMaxFlushedTimestamp; @Override public HRegionInfo getRegionInfo() { return region.getRegionInfo(); } - RegionScannerImpl(Scan scan, List additionalScanners, HRegion region) + protected RegionScannerImpl(Scan scan, List additionalScanners, HRegion region) throws IOException { this(scan, additionalScanners, region, HConstants.NO_NONCE, HConstants.NO_NONCE); } @@ -5831,6 +5825,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi protected void initializeScanners(Scan scan, List additionalScanners) throws IOException { + storesMaxFlushedTimestamp = new HashMap<>(scan.getFamilyMap().size()); // Here we separate all scanners into two lists - scanner that provide data required // by the filter to operate (scanners list) and all others (joinedScanners list). List scanners = new ArrayList(scan.getFamilyMap().size()); @@ -5850,6 +5845,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi KeyValueScanner scanner; try { scanner = store.getScanner(scan, entry.getValue(), this.readPt); + storesMaxFlushedTimestamp.put(entry.getKey(), store.getMaxFlushedTimestamp()); } catch (FileNotFoundException e) { throw handleFileNotFound(e); } @@ -6432,6 +6428,31 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } @Override + public boolean testTSMonotonicity() { + assert storesMaxFlushedTimestamp != null; + for(Long ts : storesMaxFlushedTimestamp.values()) { + if(ts == null) { + return false; // null indicates the store does not preserve ts monotonicity + } + } + return true; + } + + @Override + public boolean recheckTSMonotonicity(InternalScan scan) { + for (Map.Entry> entry : scan.getFamilyMap().entrySet()) { + Store store = stores.get(entry.getKey()); + Long collectedTS = storesMaxFlushedTimestamp.get(entry.getKey()); + assert collectedTS != null; + if(collectedTS != store.getMaxFlushedTimestamp()) { + return false; + } + } + // double-collect seen same max flushed ts in all stores as in the first collect + return true; + } + + @Override public void run() throws IOException { // This is the RPC callback method executed. We do the close in of the scanner in this // callback diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index ad23ce0..9978d37 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -2515,7 +2515,18 @@ public class HStore implements Store { @Override public boolean isSloppyMemstore() { - return this.memstore.isSloppy(); + return memstore.isSloppy(); + } + + /** + * A store preserves monotonicity if all timestamps in memstore are strictly greater than all + * timestamps in store files. + * @return maximal timestamp that was flushed to disk in this store or null if monotonicity is not + * preserved + */ + @Override + public Long getMaxFlushedTimestamp() { + return memstore.getMaxFlushedTimestamp(); } private void clearCompactedfiles(final List filesToRemove) throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java index faa9b67..76b7e84 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java @@ -152,6 +152,11 @@ public class ImmutableSegment extends Segment { return this.timeRange.getMin(); } + @Override + public long getMaxTimestamp() { + return this.timeRange.getMax(); + } + public int getNumOfSegments() { return 1; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java index 38d3e44..96ff3ae 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -127,4 +127,12 @@ public interface MemStore { /* Return true if the memstore may use some extra memory space*/ boolean isSloppy(); + + /** + * A store preserves monotonicity if all timestamps in memstore are strictly greater than all + * timestamps in store files. + * @return maximal timestamp that was flushed to disk in this store or null if monotonicity is not + * preserved + */ + Long getMaxFlushedTimestamp(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java index 3dbd7ad..b472f87 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java @@ -117,6 +117,10 @@ public class MutableSegment extends Segment { && (this.timeRangeTracker.getMax() >= oldestUnexpiredTS)); } + @Override public long getMaxTimestamp() { + return this.timeRangeTracker.getMax(); + } + @Override public long getMinTimestamp() { return this.timeRangeTracker.getMin(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index a072dce..8690eaa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -38,6 +38,7 @@ import java.util.NavigableMap; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; @@ -601,7 +602,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, * Execute an append mutation. * * @param region - * @param m * @param cellScanner * @return result to return to client if default operation should be * bypassed as indicated by RegionObserver, null otherwise @@ -1097,6 +1097,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA, DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA); + memoryScanOptimization = rs.conf.getBoolean(MEMORY_SCAN_OPTIMIZATION_KEY, false); + InetSocketAddress address = rpcServer.getListenerAddress(); if (address == null) { throw new IOException("Listener channel is closed"); @@ -2295,8 +2297,17 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } + private static AtomicInteger MEMORY_SCANS = new AtomicInteger(0); + private static AtomicInteger FULL_SCANS = new AtomicInteger(0); + private static String MEMORY_SCAN_OPTIMIZATION_KEY = "regionserver.memory.scan.optimization"; + // TODO make this a table property instead of global property, and make it true by default + private boolean memoryScanOptimization = false; + private Result get(Get get, HRegion region, RegionScannersCloseCallBack closeCallBack, RpcCallContext context) throws IOException { + + boolean applyMemoryScanOptimization = + memoryScanOptimization & get.shouldApplyMemoryScanOptimization(); region.prepareGet(get); List results = new ArrayList(); boolean stale = region.getRegionInfo().getReplicaId() != 0; @@ -2308,31 +2319,60 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } long before = EnvironmentEdgeManager.currentTime(); - Scan scan = new Scan(get); - if (scan.getLoadColumnFamiliesOnDemandValue() == null) { - scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault()); - } RegionScanner scanner = null; + RegionScanner internalScanner = null; + boolean monotonic = false; try { - scanner = region.getScanner(scan); - scanner.next(results); + int memScansCount = 0; + if(applyMemoryScanOptimization) { + InternalScan internalScan = new InternalScan(get); + internalScan.checkOnlyMemStore(); + if (internalScan.getLoadColumnFamiliesOnDemandValue() == null) { + internalScan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault()); + } + internalScanner = region.getScanner(internalScan); + // here internalScanner has a view of max flushed timestamps in all scanned stores and an + // indication if all scanned stores maintain monotonicity + if(internalScanner.testTSMonotonicity()) { + internalScanner.next(results); + memScansCount = MEMORY_SCANS.incrementAndGet(); + // double-collect on max flushed timestamps + monotonic = internalScanner.recheckTSMonotonicity(internalScan); + } + } + if(!applyMemoryScanOptimization + || !monotonic // failed monotonicity test + || (applyMemoryScanOptimization && !get.satisfiedWith(results)) + ) { + Scan scan = new Scan(get); + if (scan.getLoadColumnFamiliesOnDemandValue() == null) { + scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault()); + } + scanner = region.getScanner(scan); + scanner.next(results); + if(memoryScanOptimization) { + int fullScansCount = FULL_SCANS.incrementAndGet(); + if (fullScansCount % 20000 == 0) { + LOG.info("ESHCAR memScansCount=" + + memScansCount + " fullScansCount=" + fullScansCount); + } + } + } } finally { if (scanner != null) { - if (closeCallBack == null) { - // If there is a context then the scanner can be added to the current - // RpcCallContext. The rpc callback will take care of closing the - // scanner, for eg in case - // of get() - assert scanner instanceof org.apache.hadoop.hbase.ipc.RpcCallback; - context.setCallBack((RegionScannerImpl) scanner); - } else { - // The call is from multi() where the results from the get() are - // aggregated and then send out to the - // rpc. The rpccall back will close all such scanners created as part - // of multi(). - closeCallBack.addScanner(scanner); + // Executed a full scan: + // (1) add the full scan to call back context + // (2) close memory scan + addScannerToCallBackContext(closeCallBack, context, scanner); + if (internalScanner != null) { + internalScanner.close(); } } + // Only executed a memory scan: + // add the memory scan to call back context; there it is closed + else if (internalScanner != null) { + addScannerToCallBackContext(closeCallBack, context, internalScanner); + } } // post-get CP hook @@ -2343,6 +2383,24 @@ public class RSRpcServices implements HBaseRPCErrorHandler, return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale); } + private void addScannerToCallBackContext(RegionScannersCloseCallBack closeCallBack, + RpcCallContext context, RegionScanner scanner) { + if (closeCallBack == null) { + // If there is a context then the scanner can be added to the current + // RpcCallContext. The rpc callback will take care of closing the + // scanner, for eg in case + // of get() + assert scanner instanceof RpcCallback; + context.setCallBack((RegionScannerImpl) scanner); + } else { + // The call is from multi() where the results from the get() are + // aggregated and then send out to the + // rpc. The rpccall back will close all such scanners created as part + // of multi(). + closeCallBack.addScanner(scanner); + } + } + /** * Execute multiple actions on a table: get, mutate, and/or execCoprocessor * diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java index 8d8c051..fafe2af 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java @@ -124,4 +124,12 @@ public interface RegionScanner extends InternalScanner, Shipper { default void shipped() throws IOException { // do nothing } + + default boolean testTSMonotonicity() { + return false; + } + + default boolean recheckTSMonotonicity(InternalScan scan) { + return false; + } } \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java index 8581517..036f861 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java @@ -180,6 +180,7 @@ public abstract class Segment { public abstract boolean shouldSeek(Scan scan, long oldestUnexpiredTS); + public abstract long getMaxTimestamp(); public abstract long getMinTimestamp(); public boolean isTagsPresent() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index bb9e20a..79eed6a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -535,4 +535,12 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf * @return true if the memstore may need some extra memory space */ boolean isSloppyMemstore(); + + /** + * A store preserves monotonicity if all timestamps in memstore are strictly greater than all + * timestamps in store files. + * @return maximal timestamp that was flushed to disk in this store or null if monotonicity is not + * preserved + */ + Long getMaxFlushedTimestamp(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 7e08eca..41476c1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -424,6 +424,10 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner long expiredTimestampCutoff = minVersions == 0 ? oldestUnexpiredTS: Long.MIN_VALUE; // include only those scan files which pass all filters + // TODO check if we can change the implementation to return multiple scanners from a memstore + // so we can filter out (here) each one of them and not either keep all or eliminate all + // For historical reasons MemStore (both default and Compacting) return a singleton list + // comprising of a single MemStoreScanner. Perhaps this is not needed. end of TODO for (KeyValueScanner kvs : allScanners) { boolean isFile = kvs.isFileScanner(); if ((!isFile && filesOnly) || (isFile && memOnly)) { -- 2.10.1 (Apple Git-78)