From 71e2a64fdc77ec3d7170af320b39671259604136 Mon Sep 17 00:00:00 2001 From: anastas Date: Wed, 1 Mar 2017 10:01:30 +0200 Subject: [PATCH] HBASE-17662 Disable in-memory flush when replaying from WAL --- .../hbase/regionserver/AbstractMemStore.java | 5 ++- .../hbase/regionserver/CompactingMemStore.java | 43 ++++++++++++++++++---- .../hadoop/hbase/regionserver/DefaultMemStore.java | 3 ++ .../apache/hadoop/hbase/regionserver/HRegion.java | 21 ++++++++--- .../apache/hadoop/hbase/regionserver/HStore.java | 16 ++++++++ .../apache/hadoop/hbase/regionserver/MemStore.java | 12 ++++++ .../org/apache/hadoop/hbase/io/TestHeapSize.java | 6 +-- .../regionserver/wal/AbstractTestWALReplay.java | 2 - 8 files changed, 88 insertions(+), 20 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java index 225dd73..6c7886f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java @@ -54,8 +54,9 @@ public abstract class AbstractMemStore implements MemStore { // Used to track when to flush private volatile long timeOfOldestEdit; - public final static long FIXED_OVERHEAD = ClassSize - .align(ClassSize.OBJECT + (4 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG)); + public final static long FIXED_OVERHEAD = ClassSize.OBJECT + + (4 * ClassSize.REFERENCE) + + (2 * Bytes.SIZEOF_LONG); // snapshotId, timeOfOldestEdit public final static long DEEP_OVERHEAD = FIXED_OVERHEAD; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index e7f4a67..312e9fc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -71,16 +71,22 @@ public class CompactingMemStore extends AbstractMemStore { private long inmemoryFlushSize; // the threshold on active size for in-memory flush private final AtomicBoolean inMemoryFlushInProgress = new AtomicBoolean(false); + + // inWalReplay is true while we are synchronously replaying the edits from WAL + private boolean inWalReplay = false; + @VisibleForTesting private final AtomicBoolean allowCompaction = new AtomicBoolean(true); private boolean compositeSnapshot = true; - public static final long DEEP_OVERHEAD = AbstractMemStore.DEEP_OVERHEAD - + 6 * ClassSize.REFERENCE // Store, RegionServicesForStores, CompactionPipeline, - // MemStoreCompactor, inMemoryFlushInProgress, allowCompaction - + Bytes.SIZEOF_LONG // inmemoryFlushSize + + public static final long DEEP_OVERHEAD = ClassSize.align( AbstractMemStore.DEEP_OVERHEAD + + 6 * ClassSize.REFERENCE // Store, RegionServicesForStores, CompactionPipeline, + // MemStoreCompactor, inMemoryFlushInProgress, allowCompaction + + Bytes.SIZEOF_LONG // inmemoryFlushSize + + 2 * Bytes.SIZEOF_BOOLEAN // compositeSnapshot and inWalReplay + 2 * ClassSize.ATOMIC_BOOLEAN// inMemoryFlushInProgress and allowCompaction - + CompactionPipeline.DEEP_OVERHEAD + MemStoreCompactor.DEEP_OVERHEAD; + + CompactionPipeline.DEEP_OVERHEAD + MemStoreCompactor.DEEP_OVERHEAD); public CompactingMemStore(Configuration conf, CellComparator c, HStore store, RegionServicesForStores regionServices, @@ -232,6 +238,24 @@ public class CompactingMemStore extends AbstractMemStore { } } + /** + * This message intends to inform the MemStore that next coming updates + * are going to be part of the replaying edits from WAL + */ + @Override + public void startReplayingFromWAL() { + inWalReplay = true; + } + + /** + * This message intends to inform the MemStore that the replaying edits from WAL + * are done + */ + @Override + public void stopReplayingFromWAL() { + inWalReplay = false; + } + // the getSegments() method is used for tests only @VisibleForTesting @Override @@ -388,9 +412,12 @@ public class CompactingMemStore extends AbstractMemStore { private boolean shouldFlushInMemory() { if (this.active.keySize() > inmemoryFlushSize) { // size above flush threshold - // the inMemoryFlushInProgress is CASed to be true here in order to mutual exclude - // the insert of the active into the compaction pipeline - return (inMemoryFlushInProgress.compareAndSet(false,true)); + if (inWalReplay) { // when replaying edits from WAL there is no need in in-memory flush + return false; // regardless the size + } + // the inMemoryFlushInProgress is CASed to be true here in order to mutual exclude + // the insert of the active into the compaction pipeline + return (inMemoryFlushInProgress.compareAndSet(false,true)); } return false; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java index 63af570..a31c2c3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; /** @@ -57,6 +58,8 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; public class DefaultMemStore extends AbstractMemStore { private static final Log LOG = LogFactory.getLog(DefaultMemStore.class); + public final static long DEEP_OVERHEAD = ClassSize.align(AbstractMemStore.DEEP_OVERHEAD); + public final static long FIXED_OVERHEAD = ClassSize.align(AbstractMemStore.FIXED_OVERHEAD); /** * Default constructor. Used for tests. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index a4dc974..cc32179 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -878,11 +878,22 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi long maxSeqId = initializeStores(reporter, status); this.mvcc.advanceTo(maxSeqId); if (ServerRegionReplicaUtil.shouldReplayRecoveredEdits(this)) { - // Recover any edits if available. - maxSeqId = Math.max(maxSeqId, - replayRecoveredEditsIfAny(this.fs.getRegionDir(), maxSeqIdInStores, reporter, status)); - // Make sure mvcc is up to max. - this.mvcc.advanceTo(maxSeqId); + List stores = this.getStores(); // update the stores that we are replaying + try { + for (Store store : stores) { + ((HStore) store).startReplayingFromWAL(); + } + // Recover any edits if available. + maxSeqId = Math.max(maxSeqId, + replayRecoveredEditsIfAny(this.fs.getRegionDir(), maxSeqIdInStores, reporter, status)); + // Make sure mvcc is up to max. + this.mvcc.advanceTo(maxSeqId); + } finally { + for (Store store : stores) { // update the stores that we are done replaying + ((HStore)store).stopReplayingFromWAL(); + } + } + } this.lastReplayedOpenRegionSeqId = maxSeqId; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 4d2fea8..b74e635 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -663,6 +663,22 @@ public class HStore implements Store { } /** + * This message intends to inform the MemStore that next coming updates + * are going to be part of the replaying edits from WAL + */ + public void startReplayingFromWAL(){ + this.memstore.startReplayingFromWAL(); + } + + /** + * This message intends to inform the MemStore that the replaying edits from WAL + * are done + */ + public void stopReplayingFromWAL(){ + this.memstore.stopReplayingFromWAL(); + } + + /** * Adds a value to the memstore * @param cell * @param memstoreSize diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java index 38d3e44..8b0ad19 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -127,4 +127,16 @@ public interface MemStore { /* Return true if the memstore may use some extra memory space*/ boolean isSloppy(); + + /** + * This message intends to inform the MemStore that next coming updates + * are going to be part of the replaying edits from WAL + */ + default void startReplayingFromWAL(){return;} + + /** + * This message intends to inform the MemStore that the replaying edits from WAL + * are done + */ + default void stopReplayingFromWAL(){return;} } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java index ceaadbe..2f33859 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java @@ -324,10 +324,10 @@ public class TestHeapSize { expected += ClassSize.estimateBase(AtomicBoolean.class, false); expected += ClassSize.estimateBase(AtomicBoolean.class, false); expected += ClassSize.estimateBase(CompactionPipeline.class, false); - expected += ClassSize.estimateBase(LinkedList.class, false); - expected += ClassSize.estimateBase(LinkedList.class, false); + expected += ClassSize.estimateBase(LinkedList.class, false); //inside CompactionPipeline + expected += ClassSize.estimateBase(LinkedList.class, false); //inside CompactionPipeline expected += ClassSize.estimateBase(MemStoreCompactor.class, false); - expected += ClassSize.estimateBase(AtomicBoolean.class, false); + expected += ClassSize.estimateBase(AtomicBoolean.class, false);// inside MemStoreCompactor if (expected != actual) { ClassSize.estimateBase(cl, true); ClassSize.estimateBase(AtomicBoolean.class, true); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java index 9ac07d7..90eacf0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java @@ -127,8 +127,6 @@ public abstract class AbstractTestWALReplay { Configuration conf = TEST_UTIL.getConfiguration(); // The below config supported by 0.20-append and CDH3b2 conf.setInt("dfs.client.block.recovery.retries", 2); - conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, - String.valueOf(MemoryCompactionPolicy.NONE)); TEST_UTIL.startMiniCluster(3); Path hbaseRootDir = TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase")); -- 1.8.5.2 (Apple Git-48)