From 6bf6ab66aa285e60779db988c75787ed1ad97ba4 Mon Sep 17 00:00:00 2001 From: eshcar Date: Sun, 15 Jan 2017 19:56:40 +0200 Subject: [PATCH] HBASE-17407: Correct update of maxFlushedSeqId in HRegion --- .../hbase/regionserver/CompactingMemStore.java | 21 ++++++++++++++++----- .../hbase/regionserver/CompactionPipeline.java | 8 ++++++++ .../hadoop/hbase/regionserver/DefaultMemStore.java | 4 +++- .../apache/hadoop/hbase/regionserver/HRegion.java | 10 ++++------ .../apache/hadoop/hbase/regionserver/HStore.java | 4 ++-- .../apache/hadoop/hbase/regionserver/MemStore.java | 10 +++++----- .../hbase/regionserver/wal/AbstractFSWAL.java | 9 +++++++++ .../regionserver/wal/SequenceIdAccounting.java | 19 ++++++++++++++++--- .../hadoop/hbase/wal/DisabledWALProvider.java | 6 ++++++ .../main/java/org/apache/hadoop/hbase/wal/WAL.java | 2 ++ 10 files changed, 71 insertions(+), 22 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index 99c1685..44a1684 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; @@ -123,13 +124,17 @@ public class CompactingMemStore extends AbstractMemStore { } /** - * This method is called when it is clear that the flush to disk is completed. - * The store may do any post-flush actions at this point. - * One example is to update the WAL with sequence number that is known only at the store level. + * This method is called before the flush is executed. + * @return an estimation (lower bound) of the unflushed sequence id in memstore after the flush + * is executed. if memstore will be cleared returns {@code HConstants.NO_SEQNUM}. */ @Override - public void finalizeFlush() { - updateLowestUnflushedSequenceIdInWAL(false); + public long preFlushSeqIDEstimation() { + Segment segment = getLastSegmentAfterFlush(); + if(segment == null) { + return HConstants.NO_SEQNUM; + } + return segment.getMinSequenceId(); } @Override @@ -340,6 +345,12 @@ public class CompactingMemStore extends AbstractMemStore { } } + private Segment getLastSegmentAfterFlush() { + Segment localActive = getActive(); + Segment tail = pipeline.getTail(); + return tail == null ? localActive : tail; + } + private byte[] getFamilyNameInBytes() { return store.getFamily().getName(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java index fafdbee..4fb8c83 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java @@ -255,6 +255,14 @@ public class CompactionPipeline { if(segment != null) pipeline.addLast(segment); } + public Segment getTail() { + List localCopy = getSegments(); + if(localCopy.isEmpty()) { + return null; + } + return localCopy.get(localCopy.size()-1); + } + private boolean addFirst(ImmutableSegment segment) { pipeline.addFirst(segment); return true; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java index d4e6e12..63af570 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java @@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.Bytes; @@ -169,7 +170,8 @@ public class DefaultMemStore extends AbstractMemStore { } @Override - public void finalizeFlush() { + public long preFlushSeqIDEstimation() { + return HConstants.NO_SEQNUM; } @Override public boolean isSloppy() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index a5172bb..f8f3375 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -2412,9 +2412,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi status.setStatus("Preparing flush snapshotting stores in " + getRegionInfo().getEncodedName()); MemstoreSize totalSizeOfFlushableStores = new MemstoreSize(); - Set flushedFamilyNames = new HashSet(); + Map flushedFamilyNamesToSeq = new HashMap<>(); for (Store store: storesToFlush) { - flushedFamilyNames.add(store.getFamily().getName()); + flushedFamilyNamesToSeq.put(store.getFamily().getName(), + ((HStore) store).preFlushSeqIDEstimation()); } TreeMap storeFlushCtxs @@ -2434,7 +2435,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi try { if (wal != null) { Long earliestUnflushedSequenceIdForTheRegion = - wal.startCacheFlush(encodedRegionName, flushedFamilyNames); + wal.startCacheFlush(encodedRegionName, flushedFamilyNamesToSeq); if (earliestUnflushedSequenceIdForTheRegion == null) { // This should never happen. This is how startCacheFlush signals flush cannot proceed. String msg = this.getRegionInfo().getEncodedName() + " flush aborted; WAL closing."; @@ -2677,9 +2678,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } // If we get to here, the HStores have been written. - for(Store storeToFlush :storesToFlush) { - ((HStore) storeToFlush).finalizeFlush(); - } if (wal != null) { wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 2a93b70..36b9425 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -2510,8 +2510,8 @@ public class HStore implements Store { } } - public void finalizeFlush() { - memstore.finalizeFlush(); + public Long preFlushSeqIDEstimation() { + return memstore.preFlushSeqIDEstimation(); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java index b094476..38d3e44 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -119,12 +119,12 @@ public interface MemStore { MemstoreSize size(); /** - * This method is called when it is clear that the flush to disk is completed. - * The store may do any post-flush actions at this point. - * One example is to update the wal with sequence number that is known only at the store level. + * This method is called before the flush is executed. + * @return an estimation (lower bound) of the unflushed sequence id in memstore after the flush + * is executed. if memstore will be cleared returns {@code HConstants.NO_SEQNUM}. */ - void finalizeFlush(); + long preFlushSeqIDEstimation(); - /* Return true if the memstore may need some extra memory space*/ + /* Return true if the memstore may use some extra memory space*/ boolean isSloppy(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index 316e2f6..7e3bd59 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -429,6 +429,15 @@ public abstract class AbstractFSWAL implements WAL { } @Override + public Long startCacheFlush(byte[] encodedRegionName, Map familyToSeq) { + if (!closeBarrier.beginOp()) { + LOG.info("Flush not started for " + Bytes.toString(encodedRegionName) + "; server closing."); + return null; + } + return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, familyToSeq); + } + + @Override public void completeCacheFlush(byte[] encodedRegionName) { this.sequenceIdAccounting.completeCacheFlush(encodedRegionName); closeBarrier.endOp(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java index 6e7ad9b..c9dbca0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java @@ -264,6 +264,14 @@ class SequenceIdAccounting { * oldest/lowest outstanding edit. */ Long startCacheFlush(final byte[] encodedRegionName, final Set families) { + Map familytoSeq = new HashMap<>(); + for (byte[] familyName : families){ + familytoSeq.put(familyName,HConstants.NO_SEQNUM); + } + return startCacheFlush(encodedRegionName,familytoSeq); + } + + Long startCacheFlush(final byte[] encodedRegionName, final Map familyToSeq) { Map oldSequenceIds = null; Long lowestUnflushedInRegion = HConstants.NO_SEQNUM; synchronized (tieLock) { @@ -273,9 +281,14 @@ class SequenceIdAccounting { // circumstance because another concurrent thread now may add sequenceids for this family // (see above in getOrCreateLowestSequenceId). Make sure you are ok with this. Usually it // is fine because updates are blocked when this method is called. Make sure!!! - for (byte[] familyName : families) { - ImmutableByteArray familyNameWrapper = ImmutableByteArray.wrap(familyName); - Long seqId = m.remove(familyNameWrapper); + for (Map.Entry entry : familyToSeq.entrySet()) { + ImmutableByteArray familyNameWrapper = ImmutableByteArray.wrap((byte[]) entry.getKey()); + Long seqId = null; + if(entry.getValue() == HConstants.NO_SEQNUM) { + seqId = m.remove(familyNameWrapper); + } else { + seqId = m.replace(familyNameWrapper, entry.getValue()); + } if (seqId != null) { if (oldSequenceIds == null) { oldSequenceIds = new HashMap<>(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java index 7f10d7d..90f09d0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.wal; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; @@ -195,6 +196,11 @@ class DisabledWALProvider implements WALProvider { sync(); } + public Long startCacheFlush(final byte[] encodedRegionName, Map + flushedFamilyNamesToSeq) { + return startCacheFlush(encodedRegionName, flushedFamilyNamesToSeq.keySet()); + } + @Override public Long startCacheFlush(final byte[] encodedRegionName, Set flushedFamilyNames) { if (closed.get()) return null; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java index 030d8b6..b7adc60 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java @@ -161,6 +161,8 @@ public interface WAL extends Closeable { */ Long startCacheFlush(final byte[] encodedRegionName, Set families); + Long startCacheFlush(final byte[] encodedRegionName, Map familyToSeq); + /** * Complete the cache flush. * @param encodedRegionName Encoded region name. -- 2.10.1 (Apple Git-78)