From 4a4a2e87cbf971a5c602b8376855cb910fc3455f Mon Sep 17 00:00:00 2001 From: anastas Date: Thu, 23 Feb 2017 23:26:46 +0200 Subject: [PATCH] My squashed commits --- .../hbase/regionserver/AbstractMemStore.java | 3 +- .../hbase/regionserver/CompactingMemStore.java | 47 +++++++++++++++++----- .../apache/hadoop/hbase/regionserver/HRegion.java | 21 +++++++--- .../apache/hadoop/hbase/regionserver/HStore.java | 16 ++++++++ .../apache/hadoop/hbase/regionserver/MemStore.java | 12 ++++++ .../org/apache/hadoop/hbase/io/TestHeapSize.java | 9 +++-- .../regionserver/wal/AbstractTestWALReplay.java | 2 - 7 files changed, 88 insertions(+), 22 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java index 225dd73..8e605e0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java @@ -55,7 +55,8 @@ public abstract class AbstractMemStore implements MemStore { private volatile long timeOfOldestEdit; public final static long FIXED_OVERHEAD = ClassSize - .align(ClassSize.OBJECT + (4 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG)); + .align(ClassSize.OBJECT + (4 * ClassSize.REFERENCE) + + (2 * Bytes.SIZEOF_LONG)); // snapshotId, timeOfOldestEdit public final static long DEEP_OVERHEAD = FIXED_OVERHEAD; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index e7f4a67..3d8b919 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -68,19 +68,24 @@ public class CompactingMemStore extends AbstractMemStore { private RegionServicesForStores regionServices; private CompactionPipeline pipeline; private MemStoreCompactor compactor; - + private boolean compositeSnapshot = true; private long inmemoryFlushSize; // the threshold on active size for in-memory flush private final AtomicBoolean inMemoryFlushInProgress = new AtomicBoolean(false); + + // inWalReplay is true while we are synchronously replaying the edits from WAL + private boolean inWalReplay = false; + @VisibleForTesting private final AtomicBoolean allowCompaction = new AtomicBoolean(true); - private boolean compositeSnapshot = true; - public static final long DEEP_OVERHEAD = AbstractMemStore.DEEP_OVERHEAD - + 6 * ClassSize.REFERENCE // Store, RegionServicesForStores, CompactionPipeline, - // MemStoreCompactor, inMemoryFlushInProgress, allowCompaction - + Bytes.SIZEOF_LONG // inmemoryFlushSize + + public static final long DEEP_OVERHEAD = ClassSize.align( AbstractMemStore.DEEP_OVERHEAD + + 5 * ClassSize.REFERENCE // Store, RegionServicesForStores, CompactionPipeline, + // MemStoreCompactor, inMemoryFlushInProgress, allowCompaction + + Bytes.SIZEOF_LONG // inmemoryFlushSize + + 2 * Bytes.SIZEOF_BOOLEAN // compositeSnapshot and inWalReplay + 2 * ClassSize.ATOMIC_BOOLEAN// inMemoryFlushInProgress and allowCompaction - + CompactionPipeline.DEEP_OVERHEAD + MemStoreCompactor.DEEP_OVERHEAD; + + CompactionPipeline.DEEP_OVERHEAD + MemStoreCompactor.DEEP_OVERHEAD); public CompactingMemStore(Configuration conf, CellComparator c, HStore store, RegionServicesForStores regionServices, @@ -232,6 +237,24 @@ public class CompactingMemStore extends AbstractMemStore { } } + /** + * This message intends to inform the MemStore that next coming updates + * are going to be part of the replaying edits from WAL + */ + @Override + public void startReplayingFromWAL() { + inWalReplay = true; + } + + /** + * This message intends to inform the MemStore that the replaying edits from WAL + * are done + */ + @Override + public void stopReplayingFromWAL() { + inWalReplay = false; + } + // the getSegments() method is used for tests only @VisibleForTesting @Override @@ -387,10 +410,14 @@ public class CompactingMemStore extends AbstractMemStore { } private boolean shouldFlushInMemory() { + if (this.active.keySize() > inmemoryFlushSize) { // size above flush threshold - // the inMemoryFlushInProgress is CASed to be true here in order to mutual exclude - // the insert of the active into the compaction pipeline - return (inMemoryFlushInProgress.compareAndSet(false,true)); + if (inWalReplay) { // when replaying edits from WAL there is no need in in-memory flush + return false; // regardless the size + } + // the inMemoryFlushInProgress is CASed to be true here in order to mutual exclude + // the insert of the active into the compaction pipeline + return (inMemoryFlushInProgress.compareAndSet(false,true)); } return false; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index c3db588..a5051b8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -876,11 +876,22 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi long maxSeqId = initializeStores(reporter, status); this.mvcc.advanceTo(maxSeqId); if (ServerRegionReplicaUtil.shouldReplayRecoveredEdits(this)) { - // Recover any edits if available. - maxSeqId = Math.max(maxSeqId, - replayRecoveredEditsIfAny(this.fs.getRegionDir(), maxSeqIdInStores, reporter, status)); - // Make sure mvcc is up to max. - this.mvcc.advanceTo(maxSeqId); + List stores = this.getStores(); // update the stores that we are replaying + try { + for (Store store : stores) { + ((HStore) store).startReplayingFromWAL(); + } + // Recover any edits if available. + maxSeqId = Math.max(maxSeqId, + replayRecoveredEditsIfAny(this.fs.getRegionDir(), maxSeqIdInStores, reporter, status)); + // Make sure mvcc is up to max. + this.mvcc.advanceTo(maxSeqId); + } finally { + for (Store store : stores) { // update the stores that we are done replaying + ((HStore)store).stopReplayingFromWAL(); + } + } + } this.lastReplayedOpenRegionSeqId = maxSeqId; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 84253c8..e5264e3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -663,6 +663,22 @@ public class HStore implements Store { } /** + * This message intends to inform the MemStore that next coming updates + * are going to be part of the replaying edits from WAL + */ + public void startReplayingFromWAL(){ + this.memstore.startReplayingFromWAL(); + } + + /** + * This message intends to inform the MemStore that the replaying edits from WAL + * are done + */ + public void stopReplayingFromWAL(){ + this.memstore.stopReplayingFromWAL(); + } + + /** * Adds a value to the memstore * @param cell * @param memstoreSize diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java index 38d3e44..8b0ad19 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -127,4 +127,16 @@ public interface MemStore { /* Return true if the memstore may use some extra memory space*/ boolean isSloppy(); + + /** + * This message intends to inform the MemStore that next coming updates + * are going to be part of the replaying edits from WAL + */ + default void startReplayingFromWAL(){return;} + + /** + * This message intends to inform the MemStore that the replaying edits from WAL + * are done + */ + default void stopReplayingFromWAL(){return;} } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java index ceaadbe..f4561aa 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.io.hfile.LruCachedBlock; import org.apache.hadoop.hbase.regionserver.*; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.junit.BeforeClass; import org.junit.Test; @@ -320,14 +321,14 @@ public class TestHeapSize { // CompactingMemStore Deep Overhead cl = CompactingMemStore.class; actual = CompactingMemStore.DEEP_OVERHEAD; - expected = ClassSize.estimateBase(cl, false); + expected = ClassSize.estimateBase(cl,false); expected += ClassSize.estimateBase(AtomicBoolean.class, false); expected += ClassSize.estimateBase(AtomicBoolean.class, false); expected += ClassSize.estimateBase(CompactionPipeline.class, false); - expected += ClassSize.estimateBase(LinkedList.class, false); - expected += ClassSize.estimateBase(LinkedList.class, false); + expected += ClassSize.estimateBase(LinkedList.class, false); //inside CompactionPipeline + expected += ClassSize.estimateBase(LinkedList.class, false); //inside CompactionPipeline expected += ClassSize.estimateBase(MemStoreCompactor.class, false); - expected += ClassSize.estimateBase(AtomicBoolean.class, false); + expected += ClassSize.estimateBase(AtomicBoolean.class, false);// inside MemStoreCompactor if (expected != actual) { ClassSize.estimateBase(cl, true); ClassSize.estimateBase(AtomicBoolean.class, true); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java index 9ac07d7..90eacf0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java @@ -127,8 +127,6 @@ public abstract class AbstractTestWALReplay { Configuration conf = TEST_UTIL.getConfiguration(); // The below config supported by 0.20-append and CDH3b2 conf.setInt("dfs.client.block.recovery.retries", 2); - conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, - String.valueOf(MemoryCompactionPolicy.NONE)); TEST_UTIL.startMiniCluster(3); Path hbaseRootDir = TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase")); -- 1.8.5.2 (Apple Git-48)