From 3b8f27667cae0a181b1c1420663f44acf0fefc70 Mon Sep 17 00:00:00 2001 From: eshcar Date: Thu, 26 Jan 2017 11:14:26 +0200 Subject: [PATCH] HBASE-17339: Scan-Memory-First Optimization for Get Operations --- .../org/apache/hadoop/hbase/HTableDescriptor.java | 21 +++- .../java/org/apache/hadoop/hbase/client/Get.java | 50 +++++++++ .../hbase/regionserver/AbstractMemStore.java | 60 +++++++---- .../hbase/regionserver/CompactingMemStore.java | 27 +++-- .../regionserver/CompositeImmutableSegment.java | 22 +++- .../hadoop/hbase/regionserver/DefaultMemStore.java | 13 ++- .../apache/hadoop/hbase/regionserver/HRegion.java | 44 ++++++-- .../apache/hadoop/hbase/regionserver/HStore.java | 58 +++++++---- .../hbase/regionserver/ImmutableSegment.java | 5 + .../apache/hadoop/hbase/regionserver/MemStore.java | 8 ++ .../hadoop/hbase/regionserver/MutableSegment.java | 4 + .../hadoop/hbase/regionserver/RSRpcServices.java | 114 +++++++++++++++------ .../hadoop/hbase/regionserver/RegionScanner.java | 8 ++ .../apache/hadoop/hbase/regionserver/Segment.java | 1 + .../apache/hadoop/hbase/regionserver/Store.java | 8 ++ .../hadoop/hbase/regionserver/StoreScanner.java | 4 + .../apache/hadoop/hbase/HBaseTestingUtility.java | 12 ++- .../apache/hadoop/hbase/TestAcidGuarantees.java | 29 +++++- 18 files changed, 379 insertions(+), 109 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java index 60b85fe..cac4272 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java @@ -198,6 +198,10 @@ public class HTableDescriptor implements Comparable { /** Relative priority of the table used for rpc scheduling */ private static final int DEFAULT_PRIORITY = HConstants.NORMAL_QOS; + private static final String MEMORY_SCAN_OPTIMIZATION = "MEMORY_SCAN_OPTIMIZATION"; + private static final Bytes MEMORY_SCAN_OPTIMIZATION_KEY = + new Bytes(Bytes.toBytes(MEMORY_SCAN_OPTIMIZATION)); + /* * The below are ugly but better than creating them each time till we * replace booleans being saved as Strings with plain booleans. Need a @@ -236,6 +240,8 @@ public class HTableDescriptor implements Comparable { public static final boolean DEFAULT_REGION_MEMSTORE_REPLICATION = true; + public static final Boolean DEFAULT_MEMORY_SCAN_OPTIMIZATION = false; + private final static Map DEFAULT_VALUES = new HashMap(); private final static Set RESERVED_KEYWORDS @@ -253,6 +259,7 @@ public class HTableDescriptor implements Comparable { DEFAULT_VALUES.put(REGION_REPLICATION, String.valueOf(DEFAULT_REGION_REPLICATION)); DEFAULT_VALUES.put(NORMALIZATION_ENABLED, String.valueOf(DEFAULT_NORMALIZATION_ENABLED)); DEFAULT_VALUES.put(PRIORITY, String.valueOf(DEFAULT_PRIORITY)); + DEFAULT_VALUES.put(MEMORY_SCAN_OPTIMIZATION, String.valueOf(DEFAULT_MEMORY_SCAN_OPTIMIZATION)); for (String s : DEFAULT_VALUES.keySet()) { RESERVED_KEYWORDS.add(new Bytes(Bytes.toBytes(s))); } @@ -394,8 +401,7 @@ public class HTableDescriptor implements Comparable { * @param name */ private void setMetaFlags(final TableName name) { - setMetaRegion(isRootRegion() || - name.equals(TableName.META_TABLE_NAME)); + setMetaRegion(isRootRegion() || name.equals(TableName.META_TABLE_NAME)); } /** @@ -601,6 +607,17 @@ public class HTableDescriptor implements Comparable { } /** + * @return true iff scan-memory-first optimization of Get operations can be applied on the table + */ + public boolean getMemoryScanOptimization() { + return isSomething(MEMORY_SCAN_OPTIMIZATION_KEY, DEFAULT_MEMORY_SCAN_OPTIMIZATION); + } + + public HTableDescriptor setMemoryScanOptimization(final boolean memoryScanOptimization) { + return setValue(MEMORY_SCAN_OPTIMIZATION_KEY, memoryScanOptimization? TRUE: FALSE); + } + + /** * Check if the readOnly flag of the table is set. If the readOnly flag is * set then the contents of the table can only be read from but not modified. * diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java index 947b54a..bb8f1ab 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java @@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.NavigableSet; @@ -31,6 +32,8 @@ import java.util.TreeSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; @@ -533,4 +536,51 @@ public class Get extends Query return (Get) super.setIsolationLevel(level); } + public boolean shouldApplyMemoryScanOptimization() { + // TODO add a flag which indicates if the user wants to apply the optimization + for(Set columns : familyMap.values()) { + if(columns == null) return false; // not explicit set of columns -- cannot apply optimization + } + return true; + } + + public boolean satisfiedWith(List results) { + if(!shouldApplyMemoryScanOptimization()) return false; + Bytes[] columns = getColumns(); + int[] counters = new int[columns.length]; + // count #cells per qualifier in the list of columns + for(Cell cell : results) { + int index = 0; + for(Bytes col : columns) { + if(CellComparator.compareQualifiers(cell, col.get(), col.getOffset(), col.getLength()) == + 0) { + counters[index]++; + } + index++; + } + } + // verify each qualifier has sufficient number of versions as defined by the get operation + for (int i = 0; i < counters.length; i++) { + if(counters[i] < maxVersions) { + return false; // not enough versions + } + } + return true; // the get operation is satisfied with the result + } + + // returns a set of all columns qualifiers asked by the get operation + private Bytes[] getColumns() { + Set allColumns = new HashSet<>(); + for(Set columns : familyMap.values()) { + if (columns != null) { + allColumns.addAll(columns); + } + } + Bytes[] res = new Bytes[allColumns.size()]; + int i=0; + for(byte[] col : allColumns) { + res[i++] = new Bytes(col); + } + return res; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java index 225dd73..fa07973 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.regionserver; import com.google.common.annotations.VisibleForTesting; -import java.io.IOException; import java.util.List; import java.util.NavigableSet; import java.util.SortedSet; @@ -53,24 +52,28 @@ public abstract class AbstractMemStore implements MemStore { protected volatile long snapshotId; // Used to track when to flush private volatile long timeOfOldestEdit; + // used to check if all timestamps in memstore are strictly greater than timestamps in files + private volatile long maxFlushedTimestamp; - public final static long FIXED_OVERHEAD = ClassSize - .align(ClassSize.OBJECT + (4 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG)); + public final static long FIXED_OVERHEAD = + ClassSize.align(ClassSize.OBJECT + (4 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG)); public final static long DEEP_OVERHEAD = FIXED_OVERHEAD; - protected AbstractMemStore(final Configuration conf, final CellComparator c) { + protected AbstractMemStore(final Configuration conf, final CellComparator c, + final Long maxFlushedTimestamp) { this.conf = conf; this.comparator = c; resetActive(); this.snapshot = SegmentFactory.instance().createImmutableSegment(c); this.snapshotId = NO_SNAPSHOT_ID; + this.maxFlushedTimestamp = maxFlushedTimestamp; } protected void resetActive() { // Reset heap to not include any keys - this.active = SegmentFactory.instance().createMutableSegment(conf, comparator); - this.timeOfOldestEdit = Long.MAX_VALUE; + active = SegmentFactory.instance().createMutableSegment(conf, comparator); + timeOfOldestEdit = Long.MAX_VALUE; } /** @@ -121,6 +124,24 @@ public abstract class AbstractMemStore implements MemStore { } /** + * A store preserves monotonicity if all timestamps in memstore are strictly greater than all + * timestamps in store files. + * @return maximal timestamp that was flushed to disk in this store or null if monotonicity is not + * preserved + */ + @Override + public Long getMaxFlushedTimestamp() { + List segments = getSegments(); + for(Segment segment : segments) { + if(segment.getMinTimestamp() < maxFlushedTimestamp) { + // timestamp overlap -- not monotonic + return null; + } + } + return maxFlushedTimestamp; + } + + /** * @return Oldest timestamp of all the Cells in the MemStore */ @Override @@ -135,18 +156,19 @@ public abstract class AbstractMemStore implements MemStore { */ @Override public void clearSnapshot(long id) throws UnexpectedStateException { - if (this.snapshotId == -1) return; // already cleared - if (this.snapshotId != id) { - throw new UnexpectedStateException("Current snapshot id is " + this.snapshotId + ",passed " - + id); + if (snapshotId == -1) return; // already cleared + if (snapshotId != id) { + throw new UnexpectedStateException("Current snapshot id is " + snapshotId + ",passed " + id); } // OK. Passed in snapshot is same as current snapshot. If not-empty, // create a new snapshot and let the old one go. Segment oldSnapshot = this.snapshot; - if (!this.snapshot.isEmpty()) { - this.snapshot = SegmentFactory.instance().createImmutableSegment(this.comparator); + if (!snapshot.isEmpty()) { + // The magic moment to maintain the maximal flushed ts property + maxFlushedTimestamp = Math.max(maxFlushedTimestamp, snapshot.getMaxTimestamp()); + snapshot = SegmentFactory.instance().createImmutableSegment(comparator); } - this.snapshotId = NO_SNAPSHOT_ID; + snapshotId = NO_SNAPSHOT_ID; oldSnapshot.close(); } @@ -159,13 +181,9 @@ public abstract class AbstractMemStore implements MemStore { public String toString() { StringBuffer buf = new StringBuffer(); int i = 1; - try { - for (Segment segment : getSegments()) { - buf.append("Segment (" + i + ") " + segment.toString() + "; "); - i++; - } - } catch (IOException e){ - return e.toString(); + for (Segment segment : getSegments()) { + buf.append("Segment (" + i + ") " + segment.toString() + "; "); + i++; } return buf.toString(); } @@ -307,6 +325,6 @@ public abstract class AbstractMemStore implements MemStore { /** * @return an ordered list of segments from most recent to oldest in memstore */ - protected abstract List getSegments() throws IOException; + protected abstract List getSegments(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index e7f4a67..dbff271 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -71,21 +71,21 @@ public class CompactingMemStore extends AbstractMemStore { private long inmemoryFlushSize; // the threshold on active size for in-memory flush private final AtomicBoolean inMemoryFlushInProgress = new AtomicBoolean(false); - @VisibleForTesting - private final AtomicBoolean allowCompaction = new AtomicBoolean(true); + @VisibleForTesting private final AtomicBoolean allowCompaction = new AtomicBoolean(true); private boolean compositeSnapshot = true; - public static final long DEEP_OVERHEAD = AbstractMemStore.DEEP_OVERHEAD - + 6 * ClassSize.REFERENCE // Store, RegionServicesForStores, CompactionPipeline, - // MemStoreCompactor, inMemoryFlushInProgress, allowCompaction + public static final long DEEP_OVERHEAD = AbstractMemStore.DEEP_OVERHEAD + 6 * ClassSize.REFERENCE + // Store, RegionServicesForStores, CompactionPipeline, + // MemStoreCompactor, inMemoryFlushInProgress, allowCompaction + Bytes.SIZEOF_LONG // inmemoryFlushSize + 2 * ClassSize.ATOMIC_BOOLEAN// inMemoryFlushInProgress and allowCompaction + CompactionPipeline.DEEP_OVERHEAD + MemStoreCompactor.DEEP_OVERHEAD; - public CompactingMemStore(Configuration conf, CellComparator c, + public CompactingMemStore(final Configuration conf, final CellComparator c, HStore store, RegionServicesForStores regionServices, - MemoryCompactionPolicy compactionPolicy) throws IOException { - super(conf, c); + final MemoryCompactionPolicy compactionPolicy, + final Long maxFlushedTimestamp) throws IOException { + super(conf, c, maxFlushedTimestamp); this.store = store; this.regionServices = regionServices; this.pipeline = new CompactionPipeline(getRegionServices()); @@ -93,6 +93,12 @@ public class CompactingMemStore extends AbstractMemStore { initInmemoryFlushSize(conf); } + CompactingMemStore(final Configuration conf, final CellComparator c, + HStore store, RegionServicesForStores regionServices, + final MemoryCompactionPolicy compactionPolicy) throws IOException { + this(conf, c, store, regionServices, compactionPolicy, 0L); + } + private void initInmemoryFlushSize(Configuration conf) { long memstoreFlushSize = getRegionServices().getMemstoreFlushSize(); int numStores = getRegionServices().getNumStores(); @@ -295,8 +301,7 @@ public class CompactingMemStore extends AbstractMemStore { public List getScanners(long readPt) throws IOException { List pipelineList = pipeline.getSegments(); int order = pipelineList.size() + snapshot.getNumOfSegments(); - // The list of elements in pipeline + the active element + the snapshot segment - // TODO : This will change when the snapshot is made of more than one element + // The list of elements in pipeline + the active element + the snapshot segments // The order is the Segment ordinal List list = new ArrayList(order+1); list.add(this.active.getScanner(readPt, order + 1)); @@ -308,6 +313,8 @@ public class CompactingMemStore extends AbstractMemStore { list.add(item.getScanner(readPt, order)); order--; } + // TODO check if we can change the implementation to return multiple scanners + // so we can later filter out each one of them and not either keep all or eliminate all return Collections. singletonList(new MemStoreScanner(getComparator(), list)); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java index 30d17fb..2705f89 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java @@ -131,10 +131,6 @@ public class CompositeImmutableSegment extends ImmutableSegment { throw new IllegalStateException("Not supported by CompositeImmutableScanner"); } - public long getMinTimestamp(){ - throw new IllegalStateException("Not supported by CompositeImmutableScanner"); - } - /** * Creates the scanner for the given read point * @return a scanner for the given read point @@ -247,6 +243,24 @@ public class CompositeImmutableSegment extends ImmutableSegment { throw new IllegalStateException("Not supported by CompositeImmutableScanner"); } + @Override + public long getMaxTimestamp() { + long max = Long.MIN_VALUE; + for (Segment s : segments) { + max = Math.max(max, s.getMaxTimestamp()); + } + return max; + } + + @Override + public long getMinTimestamp() { + long min = Long.MAX_VALUE; + for (Segment s : segments) { + min = Math.min(min, s.getMinTimestamp()); + } + return min; + } + /** * @return a set of all cells in the segment */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java index 63af570..63aed19 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java @@ -60,7 +60,7 @@ public class DefaultMemStore extends AbstractMemStore { /** * Default constructor. Used for tests. */ - public DefaultMemStore() { + DefaultMemStore() { this(HBaseConfiguration.create(), CellComparator.COMPARATOR); } @@ -68,8 +68,13 @@ public class DefaultMemStore extends AbstractMemStore { * Constructor. * @param c Comparator */ - public DefaultMemStore(final Configuration conf, final CellComparator c) { - super(conf, c); + public DefaultMemStore(final Configuration conf, final CellComparator c, + final Long maxFlushedTimestamp) { + super(conf, c, maxFlushedTimestamp); + } + + DefaultMemStore(final Configuration conf, final CellComparator c) { + this(conf, c, 0L); } void dump() { @@ -134,7 +139,7 @@ public class DefaultMemStore extends AbstractMemStore { } @Override - protected List getSegments() throws IOException { + protected List getSegments() { List list = new ArrayList(2); list.add(this.active); list.add(this.snapshot); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index ac26649..cc7acfa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -72,7 +72,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; @@ -350,6 +349,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private volatile Optional configurationManager; + private boolean memoryScanOptimization = HTableDescriptor.DEFAULT_MEMORY_SCAN_OPTIMIZATION; + // Used for testing. private volatile Long timeoutForWriteLock = null; @@ -374,6 +375,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return minimumReadPoint; } + public boolean getMemoryScanOptimization() { + return memoryScanOptimization; + } + /* * Data structure of write state flags used coordinating flushes, * compactions and closes. @@ -767,6 +772,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.durability = htd.getDurability() == Durability.USE_DEFAULT ? DEFAULT_DURABILITY : htd.getDurability(); + this.memoryScanOptimization = htd.getMemoryScanOptimization(); if (rsServices != null) { this.rsAccounting = this.rsServices.getRegionServerAccounting(); // don't initialize coprocessors if not running within a regionserver @@ -2755,12 +2761,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } protected RegionScanner instantiateRegionScanner(Scan scan, - List additionalScanners) throws IOException { - return instantiateRegionScanner(scan, additionalScanners, HConstants.NO_NONCE, - HConstants.NO_NONCE); - } - - protected RegionScanner instantiateRegionScanner(Scan scan, List additionalScanners, long nonceGroup, long nonce) throws IOException { if (scan.isReversed()) { if (scan.getFilter() != null) { @@ -5755,13 +5755,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private final long maxResultSize; private final ScannerContext defaultScannerContext; private final FilterWrapper filter; + private Map storesMaxFlushedTimestamp; @Override public HRegionInfo getRegionInfo() { return region.getRegionInfo(); } - RegionScannerImpl(Scan scan, List additionalScanners, HRegion region) + protected RegionScannerImpl(Scan scan, List additionalScanners, HRegion region) throws IOException { this(scan, additionalScanners, region, HConstants.NO_NONCE, HConstants.NO_NONCE); } @@ -5806,6 +5807,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi protected void initializeScanners(Scan scan, List additionalScanners) throws IOException { + storesMaxFlushedTimestamp = new HashMap<>(scan.getFamilyMap().size()); // Here we separate all scanners into two lists - scanner that provide data required // by the filter to operate (scanners list) and all others (joinedScanners list). List scanners = new ArrayList(scan.getFamilyMap().size()); @@ -5825,6 +5827,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi KeyValueScanner scanner; try { scanner = store.getScanner(scan, entry.getValue(), this.readPt); + storesMaxFlushedTimestamp.put(entry.getKey(), store.getMaxFlushedTimestamp()); } catch (FileNotFoundException e) { throw handleFileNotFound(e); } @@ -6407,6 +6410,31 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } @Override + public boolean testTSMonotonicity() { + assert storesMaxFlushedTimestamp != null; + for(Long ts : storesMaxFlushedTimestamp.values()) { + if(ts == null) { + return false; // null indicates the store does not preserve ts monotonicity + } + } + return true; + } + + @Override + public boolean recheckTSMonotonicity(InternalScan scan) { + for (Map.Entry> entry : scan.getFamilyMap().entrySet()) { + Store store = stores.get(entry.getKey()); + Long collectedTS = storesMaxFlushedTimestamp.get(entry.getKey()); + assert collectedTS != null; + if(collectedTS != store.getMaxFlushedTimestamp()) { + return false; + } + } + // double-collect seen same max flushed ts in all stores as in the first collect + return true; + } + + @Override public void run() throws IOException { // This is the RPC callback method executed. We do the close in of the scanner in this // callback diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 84253c8..d4f38ef 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -244,26 +244,6 @@ public class HStore implements Store { // Why not just pass a HColumnDescriptor in here altogether? Even if have // to clone it? scanInfo = new ScanInfo(conf, family, ttl, timeToPurgeDeletes, this.comparator); - String className = conf.get(MEMSTORE_CLASS_NAME, DefaultMemStore.class.getName()); - MemoryCompactionPolicy inMemoryCompaction = family.getInMemoryCompaction(); - if(inMemoryCompaction == null) { - inMemoryCompaction = MemoryCompactionPolicy.valueOf( - conf.get(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, - CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT)); - } - switch (inMemoryCompaction) { - case BASIC : - case EAGER : - className = CompactingMemStore.class.getName(); - this.memstore = new CompactingMemStore(conf, this.comparator, this, - this.getHRegion().getRegionServicesForStores(), inMemoryCompaction); - break; - case NONE : - default: - this.memstore = ReflectionUtils.instantiateWithCustomCtor(className, new Class[] { - Configuration.class, CellComparator.class }, new Object[] { conf, this.comparator }); - } - LOG.info("Memstore class name is " + className); this.offPeakHours = OffPeakHours.getInstance(conf); // Setting up cache configuration for this family @@ -289,6 +269,31 @@ public class HStore implements Store { this.storeEngine = createStoreEngine(this, this.conf, this.comparator); this.storeEngine.getStoreFileManager().loadFiles(loadStoreFiles()); + Long maxMemstoreTSInFiles = StoreFile.getMaxMemstoreTSInList( + this.storeEngine.getStoreFileManager().getStorefiles()); + + String className = conf.get(MEMSTORE_CLASS_NAME, DefaultMemStore.class.getName()); + MemoryCompactionPolicy inMemoryCompaction = family.getInMemoryCompaction(); + if(inMemoryCompaction == null) { + inMemoryCompaction = MemoryCompactionPolicy.valueOf( + conf.get(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, + CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT)); + } + switch (inMemoryCompaction) { + case BASIC : + case EAGER : + className = CompactingMemStore.class.getName(); + this.memstore = new CompactingMemStore(conf, this.comparator, this, + this.getHRegion().getRegionServicesForStores(), inMemoryCompaction, maxMemstoreTSInFiles); + break; + case NONE : + default: + this.memstore = ReflectionUtils.instantiateWithCustomCtor(className, new Class[] { + Configuration.class, CellComparator.class , Long.class }, + new Object[] { conf, this.comparator, maxMemstoreTSInFiles }); + } + LOG.info("Memstore class name is " + className); + // Initialize checksum type from name. The names are CRC32, CRC32C, etc. this.checksumType = getChecksumType(conf); // initilize bytes per checksum @@ -2501,7 +2506,18 @@ public class HStore implements Store { @Override public boolean isSloppyMemstore() { - return this.memstore.isSloppy(); + return memstore.isSloppy(); + } + + /** + * A store preserves monotonicity if all timestamps in memstore are strictly greater than all + * timestamps in store files. + * @return maximal timestamp that was flushed to disk in this store or null if monotonicity is not + * preserved + */ + @Override + public Long getMaxFlushedTimestamp() { + return memstore.getMaxFlushedTimestamp(); } private void clearCompactedfiles(final List filesToRemove) throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java index faa9b67..76b7e84 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java @@ -152,6 +152,11 @@ public class ImmutableSegment extends Segment { return this.timeRange.getMin(); } + @Override + public long getMaxTimestamp() { + return this.timeRange.getMax(); + } + public int getNumOfSegments() { return 1; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java index 38d3e44..96ff3ae 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -127,4 +127,12 @@ public interface MemStore { /* Return true if the memstore may use some extra memory space*/ boolean isSloppy(); + + /** + * A store preserves monotonicity if all timestamps in memstore are strictly greater than all + * timestamps in store files. + * @return maximal timestamp that was flushed to disk in this store or null if monotonicity is not + * preserved + */ + Long getMaxFlushedTimestamp(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java index 3dbd7ad..b472f87 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java @@ -117,6 +117,10 @@ public class MutableSegment extends Segment { && (this.timeRangeTracker.getMax() >= oldestUnexpiredTS)); } + @Override public long getMaxTimestamp() { + return this.timeRangeTracker.getMax(); + } + @Override public long getMinTimestamp() { return this.timeRangeTracker.getMin(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index ae4e49d..86eba49 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -38,6 +38,7 @@ import java.util.NavigableMap; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; @@ -602,7 +603,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, * Execute an append mutation. * * @param region - * @param m * @param cellScanner * @return result to return to client if default operation should be * bypassed as indicated by RegionObserver, null otherwise @@ -1085,18 +1085,14 @@ public class RSRpcServices implements HBaseRPCErrorHandler, "' configuration property.", be.getCause() != null ? be.getCause() : be); } - scannerLeaseTimeoutPeriod = rs.conf.getInt( - HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, - HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); - maxScannerResultSize = rs.conf.getLong( - HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY, - HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE); - rpcTimeout = rs.conf.getInt( - HConstants.HBASE_RPC_TIMEOUT_KEY, - HConstants.DEFAULT_HBASE_RPC_TIMEOUT); - minimumScanTimeLimitDelta = rs.conf.getLong( - REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA, - DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA); + scannerLeaseTimeoutPeriod = rs.conf.getInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, + HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); + maxScannerResultSize = rs.conf.getLong(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY, + HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE); + rpcTimeout = rs.conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + minimumScanTimeLimitDelta = rs.conf.getLong(REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA, + DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA); InetSocketAddress address = rpcServer.getListenerAddress(); if (address == null) { @@ -2311,9 +2307,16 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } + private static AtomicInteger MEMORY_SCANS = new AtomicInteger(0); + private static AtomicInteger FULL_SCANS = new AtomicInteger(0); + + private Result get(Get get, HRegion region, RegionScannersCloseCallBack closeCallBack, RpcCallContext context) throws IOException { + boolean memoryScanOptimization = region.getMemoryScanOptimization(); region.prepareGet(get); + boolean applyMemoryScanOptimization = + memoryScanOptimization && get.shouldApplyMemoryScanOptimization(); List results = new ArrayList(); boolean stale = region.getRegionInfo().getReplicaId() != 0; // pre-get CP hook @@ -2324,31 +2327,60 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } long before = EnvironmentEdgeManager.currentTime(); - Scan scan = new Scan(get); - if (scan.getLoadColumnFamiliesOnDemandValue() == null) { - scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault()); - } RegionScanner scanner = null; + RegionScanner internalScanner = null; + boolean monotonic = false; try { - scanner = region.getScanner(scan); - scanner.next(results); + int memScansCount = 0; + if(applyMemoryScanOptimization) { + InternalScan internalScan = new InternalScan(get); + internalScan.checkOnlyMemStore(); + if (internalScan.getLoadColumnFamiliesOnDemandValue() == null) { + internalScan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault()); + } + internalScanner = region.getScanner(internalScan); + // here internalScanner has a view of max flushed timestamps in all scanned stores and an + // indication if all scanned stores maintain monotonicity + if(internalScanner.testTSMonotonicity()) { + internalScanner.next(results); + memScansCount = MEMORY_SCANS.incrementAndGet(); + // double-collect on max flushed timestamps + monotonic = internalScanner.recheckTSMonotonicity(internalScan); + } + } + if(!applyMemoryScanOptimization + || !monotonic // failed monotonicity test + || (applyMemoryScanOptimization && !get.satisfiedWith(results)) + ) { + Scan scan = new Scan(get); + if (scan.getLoadColumnFamiliesOnDemandValue() == null) { + scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault()); + } + scanner = region.getScanner(scan); + scanner.next(results); + if(memoryScanOptimization) { + int fullScansCount = FULL_SCANS.incrementAndGet(); + if (fullScansCount % 1000 == 0) { + LOG.info("ESHCAR memScansCount=" + + memScansCount + " fullScansCount=" + fullScansCount); + } + } + } } finally { if (scanner != null) { - if (closeCallBack == null) { - // If there is a context then the scanner can be added to the current - // RpcCallContext. The rpc callback will take care of closing the - // scanner, for eg in case - // of get() - assert scanner instanceof org.apache.hadoop.hbase.ipc.RpcCallback; - context.setCallBack((RegionScannerImpl) scanner); - } else { - // The call is from multi() where the results from the get() are - // aggregated and then send out to the - // rpc. The rpccall back will close all such scanners created as part - // of multi(). - closeCallBack.addScanner(scanner); + // Executed a full scan: + // (1) add the full scan to call back context + // (2) close memory scan + addScannerToCallBackContext(closeCallBack, context, scanner); + if (internalScanner != null) { + internalScanner.close(); } } + // Only executed a memory scan: + // add the memory scan to call back context; there it is closed + else if (internalScanner != null) { + addScannerToCallBackContext(closeCallBack, context, internalScanner); + } } // post-get CP hook @@ -2359,6 +2391,24 @@ public class RSRpcServices implements HBaseRPCErrorHandler, return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale); } + private void addScannerToCallBackContext(RegionScannersCloseCallBack closeCallBack, + RpcCallContext context, RegionScanner scanner) { + if (closeCallBack == null) { + // If there is a context then the scanner can be added to the current + // RpcCallContext. The rpc callback will take care of closing the + // scanner, for eg in case + // of get() + assert scanner instanceof RpcCallback; + context.setCallBack((RegionScannerImpl) scanner); + } else { + // The call is from multi() where the results from the get() are + // aggregated and then send out to the + // rpc. The rpccall back will close all such scanners created as part + // of multi(). + closeCallBack.addScanner(scanner); + } + } + /** * Execute multiple actions on a table: get, mutate, and/or execCoprocessor * diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java index 8d8c051..fafe2af 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java @@ -124,4 +124,12 @@ public interface RegionScanner extends InternalScanner, Shipper { default void shipped() throws IOException { // do nothing } + + default boolean testTSMonotonicity() { + return false; + } + + default boolean recheckTSMonotonicity(InternalScan scan) { + return false; + } } \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java index 8581517..036f861 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java @@ -180,6 +180,7 @@ public abstract class Segment { public abstract boolean shouldSeek(Scan scan, long oldestUnexpiredTS); + public abstract long getMaxTimestamp(); public abstract long getMinTimestamp(); public boolean isTagsPresent() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index bb9e20a..79eed6a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -535,4 +535,12 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf * @return true if the memstore may need some extra memory space */ boolean isSloppyMemstore(); + + /** + * A store preserves monotonicity if all timestamps in memstore are strictly greater than all + * timestamps in store files. + * @return maximal timestamp that was flushed to disk in this store or null if monotonicity is not + * preserved + */ + Long getMaxFlushedTimestamp(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 8c48aef..8279021 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -436,6 +436,10 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner long expiredTimestampCutoff = minVersions == 0 ? oldestUnexpiredTS: Long.MIN_VALUE; // include only those scan files which pass all filters + // TODO check if we can change the implementation to return multiple scanners from a memstore + // so we can filter out (here) each one of them and not either keep all or eliminate all + // For historical reasons MemStore (both default and Compacting) return a singleton list + // comprising of a single MemStoreScanner. Perhaps this is not needed. end of TODO for (KeyValueScanner kvs : allScanners) { boolean isFile = kvs.isFileScanner(); if ((!isFile && filesOnly) || (isFile && memOnly)) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 4539f97..04c3b30 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -158,7 +158,7 @@ import org.apache.zookeeper.ZooKeeper.States; @InterfaceStability.Evolving @SuppressWarnings("deprecation") public class HBaseTestingUtility extends HBaseCommonTestingUtility { - private MiniZooKeeperCluster zkCluster = null; + private MiniZooKeeperCluster zkCluster = null; public static final String REGIONS_PER_SERVER_KEY = "hbase.test.regions-per-server"; /** @@ -170,6 +170,10 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { public static final String PRESPLIT_TEST_TABLE_KEY = "hbase.test.pre-split-table"; public static final boolean PRESPLIT_TEST_TABLE = true; + + public static final String TESTING_MEMORY_SCAN_OPTIMIZATION = + "hbase.test.memory.scan.optimization"; + /** * Set if we were passed a zkCluster. If so, we won't shutdown zk as * part of general shutdown. @@ -363,7 +367,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * Close both the region {@code r} and it's underlying WAL. For use in tests. */ public static void closeRegionAndWAL(final Region r) throws IOException { - closeRegionAndWAL((HRegion)r); + closeRegionAndWAL((HRegion) r); } /** @@ -1487,6 +1491,10 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { hcd.setBloomFilterType(BloomType.NONE); htd.addFamily(hcd); } + String opt = c.get(TESTING_MEMORY_SCAN_OPTIMIZATION); + if( opt != null) { + htd.setMemoryScanOptimization(Boolean.valueOf(opt)); + } getAdmin().createTable(htd, splitKeys); // HBaseAdmin only waits for regions to appear in hbase:meta // we should wait until they are assigned diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java index 59ca64c..596aecf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java @@ -49,6 +49,8 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import com.google.common.collect.Lists; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; /** * Test case that uses multiple threads to read and write multifamily rows @@ -58,10 +60,17 @@ import com.google.common.collect.Lists; * a real cluster (eg for testing with failures, region movement, etc) */ @Category({FlakeyTests.class, MediumTests.class}) +@RunWith(Parameterized.class) public class TestAcidGuarantees implements Tool { + + @Parameterized.Parameters + public static Object[] data() { + return new Object[] { Boolean.FALSE, Boolean.TRUE }; + } + protected static final Log LOG = LogFactory.getLog(TestAcidGuarantees.class); public static final TableName TABLE_NAME = TableName.valueOf("TestAcidGuarantees"); - public static final byte [] FAMILY_A = Bytes.toBytes("A"); + public static final byte[] FAMILY_A = Bytes.toBytes("A"); public static final byte [] FAMILY_B = Bytes.toBytes("B"); public static final byte [] FAMILY_C = Bytes.toBytes("C"); public static final byte [] QUALIFIER_NAME = Bytes.toBytes("data"); @@ -93,16 +102,18 @@ public class TestAcidGuarantees implements Tool { } } - public TestAcidGuarantees() { + public TestAcidGuarantees(Boolean memoryScanOptimization) throws Exception { // Set small flush size for minicluster so we exercise reseeking scanners Configuration conf = HBaseConfiguration.create(); - conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(128*1024)); + conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(128 * 1024)); // prevent aggressive region split conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, - ConstantSizeRegionSplitPolicy.class.getName()); + ConstantSizeRegionSplitPolicy.class.getName()); conf.setInt("hfile.format.version", 3); // for mob tests conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, String.valueOf(MemoryCompactionPolicy.NONE)); + conf.set(HBaseTestingUtility.TESTING_MEMORY_SCAN_OPTIMIZATION, + String.valueOf(memoryScanOptimization)); util = new HBaseTestingUtility(conf); } @@ -179,6 +190,13 @@ public class TestAcidGuarantees implements Tool { public void doAnAction() throws Exception { Get g = new Get(targetRow); + // specify list of all columns so we can test scan-memory-first optimization + for (byte[] family : targetFamilies) { + for (int i = 0; i < NUM_COLS_TO_CHECK; i++) { + byte qualifier[] = Bytes.toBytes("col" + i); + g.addColumn(family, qualifier); + } + } Result res = table.get(g); byte[] gotValue = null; if (res.getRow() == null) { @@ -488,7 +506,8 @@ public class TestAcidGuarantees implements Tool { Configuration c = HBaseConfiguration.create(); int status; try { - TestAcidGuarantees test = new TestAcidGuarantees(); + TestAcidGuarantees test = + new TestAcidGuarantees(HTableDescriptor.DEFAULT_MEMORY_SCAN_OPTIMIZATION); status = ToolRunner.run(c, test, args); } catch (Exception e) { LOG.error("Exiting due to error", e); -- 2.10.1 (Apple Git-78)