From ecdd7efacad84c94e49cf4b4243c5a738202c2b3 Mon Sep 17 00:00:00 2001 From: anastas Date: Sun, 19 Feb 2017 18:36:38 +0200 Subject: [PATCH] My squashed commits --- .../hbase/regionserver/CompactingMemStore.java | 32 ++++++++++++++++++++-- .../apache/hadoop/hbase/regionserver/HRegion.java | 7 +++++ .../apache/hadoop/hbase/regionserver/HStore.java | 16 +++++++++++ .../apache/hadoop/hbase/regionserver/MemStore.java | 12 ++++++++ 4 files changed, 64 insertions(+), 3 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index e7f4a67..e884f7d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -71,15 +71,20 @@ public class CompactingMemStore extends AbstractMemStore { private long inmemoryFlushSize; // the threshold on active size for in-memory flush private final AtomicBoolean inMemoryFlushInProgress = new AtomicBoolean(false); + + // inWalReplay is true while we are synchronously replaying the edits from WAL + private final AtomicBoolean inWalReplay = new AtomicBoolean(false); + @VisibleForTesting private final AtomicBoolean allowCompaction = new AtomicBoolean(true); private boolean compositeSnapshot = true; public static final long DEEP_OVERHEAD = AbstractMemStore.DEEP_OVERHEAD - + 6 * ClassSize.REFERENCE // Store, RegionServicesForStores, CompactionPipeline, - // MemStoreCompactor, inMemoryFlushInProgress, allowCompaction + + 7 * ClassSize.REFERENCE // Store, RegionServicesForStores, CompactionPipeline, + // MemStoreCompactor, inMemoryFlushInProgress, allowCompaction, + // inWalReplay + Bytes.SIZEOF_LONG // inmemoryFlushSize - + 2 * ClassSize.ATOMIC_BOOLEAN// inMemoryFlushInProgress and allowCompaction + + 3 * ClassSize.ATOMIC_BOOLEAN// inMemoryFlushInProgress, inWalReplay and allowCompaction + CompactionPipeline.DEEP_OVERHEAD + MemStoreCompactor.DEEP_OVERHEAD; public CompactingMemStore(Configuration conf, CellComparator c, @@ -232,6 +237,24 @@ public class CompactingMemStore extends AbstractMemStore { } } + /** + * This message intends to inform the MemStore that next coming updates + * are going to be part of the replaying edits from WAL + */ + @Override + public void startReplayingFromWAL() { + inWalReplay.compareAndSet(false,true); + } + + /** + * This message intends to inform the MemStore that the replaying edits from WAL + * are done + */ + @Override + public void stopReplayingFromWAL() { + inWalReplay.compareAndSet(true, false); + } + // the getSegments() method is used for tests only @VisibleForTesting @Override @@ -387,6 +410,9 @@ public class CompactingMemStore extends AbstractMemStore { } private boolean shouldFlushInMemory() { + if (inWalReplay.get()) { // when replaying edits from WAL there is no need in in-memory flush + return false; // regardless the size + } if (this.active.keySize() > inmemoryFlushSize) { // size above flush threshold // the inMemoryFlushInProgress is CASed to be true here in order to mutual exclude // the insert of the active into the compaction pipeline diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index c3db588..20a7849 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -876,11 +876,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi long maxSeqId = initializeStores(reporter, status); this.mvcc.advanceTo(maxSeqId); if (ServerRegionReplicaUtil.shouldReplayRecoveredEdits(this)) { + List stores = this.getStores(); // update the stores that we are replaying + for (Store store : stores) { + ((HStore)store).startReplayingFromWAL(); + } // Recover any edits if available. maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny(this.fs.getRegionDir(), maxSeqIdInStores, reporter, status)); // Make sure mvcc is up to max. this.mvcc.advanceTo(maxSeqId); + for (Store store : stores) { // update the stores that we are done replaying + ((HStore)store).stopReplayingFromWAL(); + } } this.lastReplayedOpenRegionSeqId = maxSeqId; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 84253c8..e5264e3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -663,6 +663,22 @@ public class HStore implements Store { } /** + * This message intends to inform the MemStore that next coming updates + * are going to be part of the replaying edits from WAL + */ + public void startReplayingFromWAL(){ + this.memstore.startReplayingFromWAL(); + } + + /** + * This message intends to inform the MemStore that the replaying edits from WAL + * are done + */ + public void stopReplayingFromWAL(){ + this.memstore.stopReplayingFromWAL(); + } + + /** * Adds a value to the memstore * @param cell * @param memstoreSize diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java index 38d3e44..8b0ad19 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -127,4 +127,16 @@ public interface MemStore { /* Return true if the memstore may use some extra memory space*/ boolean isSloppy(); + + /** + * This message intends to inform the MemStore that next coming updates + * are going to be part of the replaying edits from WAL + */ + default void startReplayingFromWAL(){return;} + + /** + * This message intends to inform the MemStore that the replaying edits from WAL + * are done + */ + default void stopReplayingFromWAL(){return;} } -- 1.8.5.2 (Apple Git-48)