From 31283c6bf7db589811f0b9b47eb17ae496c5a676 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Mon, 27 Mar 2017 18:16:06 +0800 Subject: [PATCH] HBASE-17633 Update unflushed sequence id in SequenceIdAccounting after flush with the minimum sequence id in memstore --- .../hadoop/hbase/util/ImmutableByteArray.java | 54 --- .../hbase/regionserver/AbstractMemStore.java | 4 +- .../hbase/regionserver/CompactingMemStore.java | 22 +- .../hbase/regionserver/CompactionPipeline.java | 7 +- .../hadoop/hbase/regionserver/DefaultMemStore.java | 11 +- .../apache/hadoop/hbase/regionserver/HRegion.java | 156 ++++---- .../apache/hadoop/hbase/regionserver/HStore.java | 5 +- .../apache/hadoop/hbase/regionserver/MemStore.java | 8 +- .../hbase/regionserver/MemStoreCompactor.java | 2 +- .../apache/hadoop/hbase/regionserver/Segment.java | 17 +- .../apache/hadoop/hbase/regionserver/Store.java | 6 + .../hbase/regionserver/wal/AbstractFSWAL.java | 54 ++- .../hadoop/hbase/regionserver/wal/FSWALEntry.java | 39 +- .../regionserver/wal/SequenceIdAccounting.java | 400 +++++---------------- .../org/apache/hadoop/hbase/util/HashedBytes.java | 4 +- .../hadoop/hbase/wal/DisabledWALProvider.java | 24 +- .../main/java/org/apache/hadoop/hbase/wal/WAL.java | 55 ++- .../hbase/regionserver/wal/AbstractTestFSWAL.java | 56 +-- .../regionserver/wal/AbstractTestWALReplay.java | 75 ++-- .../hadoop/hbase/regionserver/wal/TestFSHLog.java | 2 + .../hbase/regionserver/wal/TestFSWALEntry.java | 62 ---- .../regionserver/wal/TestSequenceIdAccounting.java | 120 +++---- .../hadoop/hbase/wal/TestFSHLogProvider.java | 178 ++++----- .../apache/hadoop/hbase/wal/TestWALFactory.java | 14 +- 24 files changed, 503 insertions(+), 872 deletions(-) delete mode 100644 hbase-common/src/main/java/org/apache/hadoop/hbase/util/ImmutableByteArray.java delete mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSWALEntry.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ImmutableByteArray.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ImmutableByteArray.java deleted file mode 100644 index afd1ebf..0000000 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ImmutableByteArray.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.util; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; - -/** - * Mainly used as keys for HashMap. - */ -@InterfaceAudience.Private -public final class ImmutableByteArray { - - private final byte[] b; - - private ImmutableByteArray(byte[] b) { - this.b = b; - } - - @Override - public int hashCode() { - return Bytes.hashCode(b); - } - - @Override - public boolean equals(Object obj) { - if (obj == null || obj.getClass() != ImmutableByteArray.class) { - return false; - } - return Bytes.equals(b, ((ImmutableByteArray) obj).b); - } - - public static ImmutableByteArray wrap(byte[] b) { - return new ImmutableByteArray(b); - } - - public String toStringUtf8() { - return Bytes.toString(b); - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java index cff2b27..48f59ca 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java @@ -90,10 +90,8 @@ public abstract class AbstractMemStore implements MemStore { /** * Updates the wal with the lowest sequence id (oldest entry) that is still in memory - * @param onlyIfMoreRecent a flag that marks whether to update the sequence id no matter what or - * only if it is greater than the previous sequence id */ - public abstract void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfMoreRecent); + public abstract void updateLowestUnflushedSequenceIdInWAL(); @Override public void add(Iterable cells, MemstoreSize memstoreSize) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index 26b2f49..1741481 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -129,21 +129,10 @@ public class CompactingMemStore extends AbstractMemStore { return memstoreSize; } - /** - * This method is called before the flush is executed. - * @return an estimation (lower bound) of the unflushed sequence id in memstore after the flush - * is executed. if memstore will be cleared returns {@code HConstants.NO_SEQNUM}. - */ @Override - public long preFlushSeqIDEstimation() { - if(compositeSnapshot) { - return HConstants.NO_SEQNUM; - } + public long minSequenceId() { Segment segment = getLastSegment(); - if(segment == null) { - return HConstants.NO_SEQNUM; - } - return segment.getMinSequenceId(); + return segment != null ? segment.getMinSequenceId() : HConstants.NO_SEQNUM; } @Override @@ -225,14 +214,14 @@ public class CompactingMemStore extends AbstractMemStore { } @Override - public void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfGreater) { + public void updateLowestUnflushedSequenceIdInWAL() { long minSequenceId = pipeline.getMinSequenceId(); - if(minSequenceId != Long.MAX_VALUE) { + if (minSequenceId > 0) { byte[] encodedRegionName = getRegionServices().getRegionInfo().getEncodedNameAsBytes(); byte[] familyName = getFamilyNameInBytes(); WAL WAL = getRegionServices().getWAL(); if (WAL != null) { - WAL.updateStore(encodedRegionName, familyName, minSequenceId, onlyIfGreater); + WAL.updateStore(encodedRegionName, familyName, minSequenceId); } } } @@ -546,5 +535,4 @@ public class CompactingMemStore extends AbstractMemStore { msg += " inMemoryFlushInProgress is "+ (inMemoryFlushInProgress.get() ? "true" : "false"); LOG.debug(msg); } - } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java index e64c0fb..426a0a7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java @@ -25,6 +25,7 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; @@ -224,12 +225,8 @@ public class CompactionPipeline { } public long getMinSequenceId() { - long minSequenceId = Long.MAX_VALUE; LinkedList localCopy = readOnlyCopy; - if (!localCopy.isEmpty()) { - minSequenceId = localCopy.getLast().getMinSequenceId(); - } - return minSequenceId; + return localCopy.isEmpty() ? HConstants.NO_SEQNUM : localCopy.getLast().getMinSequenceId(); } public MemstoreSize getTailSize() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java index d1f6b1c..8925871 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java @@ -30,7 +30,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.Bytes; @@ -150,7 +149,8 @@ public class DefaultMemStore extends AbstractMemStore { getNextRow(cell, this.snapshot.getCellSet())); } - @Override public void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfMoreRecent) { + @Override + public void updateLowestUnflushedSequenceIdInWAL() { } @Override @@ -168,11 +168,12 @@ public class DefaultMemStore extends AbstractMemStore { } @Override - public long preFlushSeqIDEstimation() { - return HConstants.NO_SEQNUM; + public long minSequenceId() { + return this.active.getMinSequenceId(); } - @Override public boolean isSloppy() { + @Override + public boolean isSloppy() { return false; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 8deb9f1..e0b6409 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -227,13 +227,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private volatile long maxFlushedSeqId = HConstants.NO_SEQNUM; /** - * Record the sequence id of last flush operation. Can be in advance of - * {@link #maxFlushedSeqId} when flushing a single column family. In this case, - * {@link #maxFlushedSeqId} will be older than the oldest edit in memory. - */ - private volatile long lastFlushOpSeqId = HConstants.NO_SEQNUM; - - /** * The sequence id of the last replayed open region event from the primary region. This is used * to skip entries before this due to the possibility of replay edits coming out of order from * replication. @@ -510,37 +503,33 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi final TreeMap storeFlushableSize; final long startTime; final long flushOpSeqId; - final long flushedSeqId; final MemstoreSize totalFlushableSize; /** Constructs an early exit case */ - PrepareFlushResult(FlushResult result, long flushSeqId) { - this(result, null, null, null, Math.max(0, flushSeqId), 0, 0, new MemstoreSize()); + PrepareFlushResult(FlushResult result, long flushOpSeqId) { + this(result, null, null, null, Math.max(0, flushOpSeqId), 0, new MemstoreSize()); } /** Constructs a successful prepare flush result */ - PrepareFlushResult( - TreeMap storeFlushCtxs, - TreeMap> committedFiles, - TreeMap storeFlushableSize, long startTime, long flushSeqId, - long flushedSeqId, MemstoreSize totalFlushableSize) { - this(null, storeFlushCtxs, committedFiles, storeFlushableSize, startTime, - flushSeqId, flushedSeqId, totalFlushableSize); - } - - private PrepareFlushResult( - FlushResult result, - TreeMap storeFlushCtxs, - TreeMap> committedFiles, - TreeMap storeFlushableSize, long startTime, long flushSeqId, - long flushedSeqId, MemstoreSize totalFlushableSize) { + PrepareFlushResult(TreeMap storeFlushCtxs, + TreeMap> committedFiles, + TreeMap storeFlushableSize, long startTime, long flushOpSeqId, + MemstoreSize totalFlushableSize) { + this(null, storeFlushCtxs, committedFiles, storeFlushableSize, startTime, flushOpSeqId, + totalFlushableSize); + } + + private PrepareFlushResult(FlushResult result, + TreeMap storeFlushCtxs, + TreeMap> committedFiles, + TreeMap storeFlushableSize, long startTime, long flushOpSeqId, + MemstoreSize totalFlushableSize) { this.result = result; this.storeFlushCtxs = storeFlushCtxs; this.committedFiles = committedFiles; this.storeFlushableSize = storeFlushableSize; this.startTime = startTime; - this.flushOpSeqId = flushSeqId; - this.flushedSeqId = flushedSeqId; + this.flushOpSeqId = flushOpSeqId; this.totalFlushableSize = totalFlushableSize; } @@ -996,6 +985,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } boolean allStoresOpened = false; boolean hasSloppyStores = false; + WAL wal = getWAL(); try { for (int i = 0; i < htableDescriptor.getFamilies().size(); i++) { Future future = completionService.take(); @@ -1004,8 +994,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (store.isSloppyMemstore()) { hasSloppyStores = true; } - long storeMaxSequenceId = store.getMaxSequenceId(); + if (wal != null) { + // store the lowestUnflushedSequenceId into WAL. This is the max sequence id in + // storefiles, so we need to plus one. + // 0 means the store does not have any store files yet so we also use 0. Do not use + // NO_SEQNUM as it usually means the value does not exist. + wal.updateStore(getRegionInfo().getEncodedNameAsBytes(), store.getFamily().getName(), + storeMaxSequenceId > 0 ? storeMaxSequenceId + 1 : 0); + } maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(), storeMaxSequenceId); if (maxSeqId == -1 || storeMaxSequenceId > maxSeqId) { @@ -1686,7 +1683,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (!abort && wal != null && getRegionServerServices() != null && !writestate.readOnly) { writeRegionCloseMarker(wal); } - + if (wal != null) { + wal.closeRegion(getRegionInfo().getEncodedNameAsBytes()); + } this.closed.set(true); if (!canFlush) { this.decrMemstoreSize(new MemstoreSize(memstoreDataSize.get(), getMemstoreHeapSize())); @@ -1859,17 +1858,23 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } RegionLoad.Builder setCompleteSequenceId(RegionLoad.Builder regionLoadBldr) { - long lastFlushOpSeqIdLocal = this.lastFlushOpSeqId; byte[] encodedRegionName = this.getRegionInfo().getEncodedNameAsBytes(); regionLoadBldr.clearStoreCompleteSequenceId(); - for (byte[] familyName : this.stores.keySet()) { - long earliest = this.wal.getEarliestMemstoreSeqNum(encodedRegionName, familyName); - // Subtract - 1 to go earlier than the current oldest, unflushed edit in memstore; this will - // give us a sequence id that is for sure flushed. We want edit replay to start after this - // sequence id in this region. If NO_SEQNUM, use the regions maximum flush id. - long csid = (earliest == HConstants.NO_SEQNUM)? lastFlushOpSeqIdLocal: earliest - 1; - regionLoadBldr.addStoreCompleteSequenceId(StoreSequenceId.newBuilder() - .setFamilyName(UnsafeByteOperations.unsafeWrap(familyName)).setSequenceId(csid).build()); + // We do not maintain these information for secondary replica, so do not report it to master. + // This is not a big deal as we will not replay any edits for secondary replica. + if (RegionReplicaUtil.isDefaultReplica(getRegionInfo())) { + this.stores.values().forEach(store -> { + long earliest = store.minSequenceIdInMemstore(); + if (earliest == HConstants.NO_SEQNUM) { + earliest = wal.getEarliestMemstoreSeqNum(encodedRegionName, store.getFamily().getName()); + } + // Subtract - 1 to go earlier than the current oldest, unflushed edit in memstore; this will + // give us a sequence id that is for sure flushed. We want edit replay to start after this + // sequence id in this region. + regionLoadBldr.addStoreCompleteSequenceId(StoreSequenceId.newBuilder() + .setFamilyName(UnsafeByteOperations.unsafeWrap(store.getFamily().getName())) + .setSequenceId(earliest - 1).build()); + }); } return regionLoadBldr.setCompleteSequenceId(getMaxFlushedSeqId()); } @@ -2412,29 +2417,21 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi status.setStatus("Preparing flush snapshotting stores in " + getRegionInfo().getEncodedName()); MemstoreSize totalSizeOfFlushableStores = new MemstoreSize(); - Map flushedFamilyNamesToSeq = new HashMap<>(); - for (Store store: storesToFlush) { - flushedFamilyNamesToSeq.put(store.getFamily().getName(), - ((HStore) store).preFlushSeqIDEstimation()); - } - - TreeMap storeFlushCtxs = new TreeMap<>(Bytes.BYTES_COMPARATOR); - TreeMap> committedFiles = new TreeMap<>(Bytes.BYTES_COMPARATOR); - TreeMap storeFlushableSize = new TreeMap<>(Bytes.BYTES_COMPARATOR); + TreeMap storeFlushCtxs + = new TreeMap(Bytes.BYTES_COMPARATOR); + TreeMap> committedFiles = new TreeMap>( + Bytes.BYTES_COMPARATOR); + TreeMap storeFlushableSize + = new TreeMap(Bytes.BYTES_COMPARATOR); // The sequence id of this flush operation which is used to log FlushMarker and pass to // createFlushContext to use as the store file's sequence id. It can be in advance of edits // still in the memstore, edits that are in other column families yet to be flushed. long flushOpSeqId = HConstants.NO_SEQNUM; - // The max flushed sequence id after this flush operation completes. All edits in memstore - // will be in advance of this sequence id. - long flushedSeqId = HConstants.NO_SEQNUM; byte[] encodedRegionName = getRegionInfo().getEncodedNameAsBytes(); try { if (wal != null) { - Long earliestUnflushedSequenceIdForTheRegion = - wal.startCacheFlush(encodedRegionName, flushedFamilyNamesToSeq); - if (earliestUnflushedSequenceIdForTheRegion == null) { - // This should never happen. This is how startCacheFlush signals flush cannot proceed. + if (!wal.startCacheFlush(encodedRegionName)) { + // This is how startCacheFlush signals flush cannot proceed. String msg = this.getRegionInfo().getEncodedName() + " flush aborted; WAL closing."; status.setStatus(msg); return new PrepareFlushResult( @@ -2442,13 +2439,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi myseqid); } flushOpSeqId = getNextSequenceId(wal); - // Back up 1, minus 1 from oldest sequence id in memstore to get last 'flushed' edit - flushedSeqId = - earliestUnflushedSequenceIdForTheRegion.longValue() == HConstants.NO_SEQNUM? - flushOpSeqId: earliestUnflushedSequenceIdForTheRegion.longValue() - 1; } else { // use the provided sequence Id as WAL is not being used for this flush. - flushedSeqId = flushOpSeqId = myseqid; + flushOpSeqId = myseqid; } for (Store s : storesToFlush) { @@ -2483,7 +2476,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi status.setStatus(s); doSyncOfUnflushedWALChanges(wal, getRegionInfo()); return new PrepareFlushResult(storeFlushCtxs, committedFiles, storeFlushableSize, startTime, - flushOpSeqId, flushedSeqId, totalSizeOfFlushableStores); + flushOpSeqId, totalSizeOfFlushableStores); } /** @@ -2583,7 +2576,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi TreeMap> committedFiles = prepareResult.committedFiles; long startTime = prepareResult.startTime; long flushOpSeqId = prepareResult.flushOpSeqId; - long flushedSeqId = prepareResult.flushedSeqId; String s = "Flushing stores of " + this; status.setStatus(s); @@ -2673,19 +2665,32 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi throw dse; } - // If we get to here, the HStores have been written. if (wal != null) { - wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes()); + // Let's construct the new lowestUnflushedSequenceId map for the flushed families + Map familyToLowestUnflushedSequenceId = new HashMap<>(storesToFlush.size()); + storesToFlush.forEach(store -> { + long lowestUnflushedSequenceId = store.minSequenceIdInMemstore(); + if (lowestUnflushedSequenceId < 0) { + lowestUnflushedSequenceId = flushOpSeqId; + } + familyToLowestUnflushedSequenceId.put(store.getFamily().getName(), lowestUnflushedSequenceId); + }); + wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes(), + familyToLowestUnflushedSequenceId); } // Record latest flush time for (Store store: storesToFlush) { this.lastStoreFlushTimeMap.put(store, startTime); } - - this.maxFlushedSeqId = flushedSeqId; - this.lastFlushOpSeqId = flushOpSeqId; + // minus one to get the flushed sequence id as the map is for unflushed sequence id. + if (wal != null) { + this.maxFlushedSeqId = + wal.getEarliestMemstoreSeqNum(this.getRegionInfo().getEncodedNameAsBytes()) - 1; + } else { + this.maxFlushedSeqId = flushOpSeqId; + } // C. Finally notify anyone waiting on memstore to clear: // e.g. checkResources(). @@ -4154,6 +4159,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (seqid > minSeqIdForTheRegion) { // Then we added some edits to memory. Flush and cleanup split edit files. internalFlushcache(null, seqid, stores.values(), status, false); + long flushedSeqId = seqid; + stores.values().stream().map(s -> s.getFamily().getName()).forEach( + fn -> wal.updateStore(getRegionInfo().getEncodedNameAsBytes(), fn, flushedSeqId + 1)); } // Now delete the content of recovered edits. We're done w/ them. if (files.size() > 0 && this.conf.getBoolean("hbase.region.archive.recovered.edits", false)) { @@ -4552,7 +4560,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } else { // special case empty memstore. We will still save the flush result in this case, since - // our memstore ie empty, but the primary is still flushing + // our memstore is empty, but the primary is still flushing if (prepareResult.getResult().getResult() == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) { this.writestate.flushing = true; @@ -4599,7 +4607,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // the memstore edits if everything in memstore is < y. This is the usual case for // RS crash + recovery where we might see consequtive prepare flush wal markers. // Otherwise, this will cause more memory to be used in secondary replica until a - // further prapare + commit flush is seen and replayed. + // further prepare + commit flush is seen and replayed. } } } finally { @@ -7613,7 +7621,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi ClassSize.OBJECT + ClassSize.ARRAY + 50 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + - (14 * Bytes.SIZEOF_LONG) + + (13 * Bytes.SIZEOF_LONG) + 6 * Bytes.SIZEOF_BOOLEAN); // woefully out of date - currently missing: @@ -8054,7 +8062,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @Override public long getOldestSeqIdOfStore(byte[] familyName) { - return wal.getEarliestMemstoreSeqNum(getRegionInfo().getEncodedNameAsBytes(), familyName); + Store store = getStore(familyName); + if (store == null) { + return HConstants.NO_SEQNUM; + } + long lowest = store.minSequenceIdInMemstore(); + if (lowest == HConstants.NO_SEQNUM) { + lowest = wal.getEarliestMemstoreSeqNum(getRegionInfo().getEncodedNameAsBytes(), familyName); + } + return lowest; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index a988c5b..34edaa6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -2509,8 +2509,9 @@ public class HStore implements Store { } } - public Long preFlushSeqIDEstimation() { - return memstore.preFlushSeqIDEstimation(); + @Override + public long minSequenceIdInMemstore() { + return memstore.minSequenceId(); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java index 8b0ad19..b114dcc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -119,11 +119,11 @@ public interface MemStore { MemstoreSize size(); /** - * This method is called before the flush is executed. - * @return an estimation (lower bound) of the unflushed sequence id in memstore after the flush - * is executed. if memstore will be cleared returns {@code HConstants.NO_SEQNUM}. + * The minimum sequence id of cells in the memstore, or -1 if there is no data. + *

+ * This method will be called after flush to track the minimum unflushed sequence id for a region. */ - long preFlushSeqIDEstimation(); + long minSequenceId(); /* Return true if the memstore may use some extra memory space*/ boolean isSloppy(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java index dfa7d18..fd5fb8e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java @@ -205,7 +205,7 @@ public class MemStoreCompactor { if (resultSwapped = compactingMemStore.swapCompactedSegments( versionedList, result, (action==Action.MERGE))) { // update the wal so it can be truncated and not get too long - compactingMemStore.updateLowestUnflushedSequenceIdInWAL(true); // only if greater + compactingMemStore.updateLowestUnflushedSequenceIdInWAL(); // only if greater } } } catch (IOException e) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java index 6f431c9..94d566a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java @@ -18,6 +18,8 @@ */ package org.apache.hadoop.hbase.regionserver; +import com.google.common.annotations.VisibleForTesting; + import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -29,15 +31,15 @@ import org.apache.commons.logging.Log; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.AtomicUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; -import com.google.common.annotations.VisibleForTesting; - /** * This is an abstraction of a segment maintained in a memstore, e.g., the active * cell set or its snapshot. @@ -57,7 +59,7 @@ public abstract class Segment { private AtomicReference cellSet= new AtomicReference<>(); private final CellComparator comparator; - protected long minSequenceId; + protected final AtomicLong minSequenceId = new AtomicLong(Long.MAX_VALUE); private MemStoreLAB memStoreLAB; // Sum of sizes of all Cells added to this Segment. Cell's heapSize is considered. This is not // including the heap overhead of this class. @@ -79,7 +81,6 @@ public abstract class Segment { protected Segment(CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB) { this.cellSet.set(cellSet); this.comparator = comparator; - this.minSequenceId = Long.MAX_VALUE; this.memStoreLAB = memStoreLAB; this.dataSize = new AtomicLong(0); this.heapSize = new AtomicLong(0); @@ -90,7 +91,8 @@ public abstract class Segment { protected Segment(Segment segment) { this.cellSet.set(segment.getCellSet()); this.comparator = segment.getComparator(); - this.minSequenceId = segment.getMinSequenceId(); + long minSequenceId = segment.getMinSequenceId(); + this.minSequenceId.set(minSequenceId > 0 ? minSequenceId : Long.MAX_VALUE); this.memStoreLAB = segment.getMemStoreLAB(); this.dataSize = new AtomicLong(segment.keySize()); this.heapSize = new AtomicLong(segment.heapSize.get()); @@ -231,7 +233,8 @@ public abstract class Segment { } public long getMinSequenceId() { - return minSequenceId; + long minSequenceId = this.minSequenceId.get(); + return minSequenceId != Long.MAX_VALUE ? minSequenceId : HConstants.NO_SEQNUM; } public TimeRangeTracker getTimeRangeTracker() { @@ -294,7 +297,7 @@ public abstract class Segment { memstoreSize.incMemstoreSize(cellSize, heapSize); } getTimeRangeTracker().includeTimestamp(cellToAdd); - minSequenceId = Math.min(minSequenceId, cellToAdd.getSequenceId()); + AtomicUtils.updateMin(minSequenceId, cellToAdd.getSequenceId()); // In no tags case this NoTagsKeyValue.getTagsLength() is a cheap call. // When we use ACL CP or Visibility CP which deals with Tags during // mutation, the TagRewriteCell.getTagsLength() is a cheaper call. We do not diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 76595f3..0f3fd0b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -544,4 +544,10 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf * @return true if the memstore may need some extra memory space */ boolean isSloppyMemstore(); + + /** + * @return the minimum sequence id in memstore, or + * {@link org.apache.hadoop.hbase.HConstants#NO_SEQNUM} if no data in memstore. + */ + long minSequenceIdInMemstore(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index f32d0ed..91e3114 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -17,9 +17,14 @@ */ package org.apache.hadoop.hbase.regionserver.wal; -import static com.google.common.base.Preconditions.*; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import com.lmax.disruptor.RingBuffer; + import java.io.IOException; import java.io.InterruptedIOException; import java.lang.management.MemoryType; @@ -29,7 +34,6 @@ import java.util.Arrays; import java.util.Comparator; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentNavigableMap; @@ -74,9 +78,6 @@ import org.apache.htrace.Span; import org.apache.htrace.Trace; import org.apache.htrace.TraceScope; -import com.google.common.annotations.VisibleForTesting; -import com.lmax.disruptor.RingBuffer; - /** * Implementation of {@link WAL} to go against {@link FileSystem}; i.e. keep WALs in HDFS. Only one * WAL is ever being written at a time. When a WAL hits a configured maximum size, it is rolled. @@ -433,33 +434,31 @@ public abstract class AbstractFSWAL implements WAL { } @Override - public Long startCacheFlush(byte[] encodedRegionName, Set families) { + public boolean startCacheFlush(byte[] encodedRegionName) { if (!closeBarrier.beginOp()) { LOG.info("Flush not started for " + Bytes.toString(encodedRegionName) + "; server closing."); - return null; + return false; + } else { + return true; } - return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, families); } @Override - public Long startCacheFlush(byte[] encodedRegionName, Map familyToSeq) { - if (!closeBarrier.beginOp()) { - LOG.info("Flush not started for " + Bytes.toString(encodedRegionName) + "; server closing."); - return null; - } - return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, familyToSeq); + public void completeCacheFlush(byte[] encodedRegionName, + Map family2LowestUnflushedSequenceId) { + this.sequenceIdAccounting.updateLowestUnflushedSequenceIds(encodedRegionName, + family2LowestUnflushedSequenceId); + closeBarrier.endOp(); } @Override - public void completeCacheFlush(byte[] encodedRegionName) { - this.sequenceIdAccounting.completeCacheFlush(encodedRegionName); + public void abortCacheFlush(byte[] encodedRegionName) { closeBarrier.endOp(); } @Override - public void abortCacheFlush(byte[] encodedRegionName) { - this.sequenceIdAccounting.abortCacheFlush(encodedRegionName); - closeBarrier.endOp(); + public void closeRegion(byte[] encodedRegionName) { + this.sequenceIdAccounting.remove(encodedRegionName); } @Override @@ -843,18 +842,10 @@ public abstract class AbstractFSWAL implements WAL { LOG.info("Closed WAL: " + toString()); } - /** - * updates the sequence number of a specific store. depending on the flag: replaces current seq - * number if the given seq id is bigger, or even if it is lower than existing one - * @param encodedRegionName - * @param familyName - * @param sequenceid - * @param onlyIfGreater - */ @Override - public void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequenceid, - boolean onlyIfGreater) { - sequenceIdAccounting.updateStore(encodedRegionName, familyName, sequenceid, onlyIfGreater); + public void updateStore(byte[] encodedRegionName, byte[] familyName, long sequenceId) { + sequenceIdAccounting.updateLowestUnflushedSequenceIds(encodedRegionName, + ImmutableMap.of(familyName, sequenceId)); } protected SyncFuture getSyncFuture(long sequence, Span span) { @@ -918,8 +909,7 @@ public abstract class AbstractFSWAL implements WAL { doAppend(writer, entry); assert highestUnsyncedTxid < entry.getTxid(); highestUnsyncedTxid = entry.getTxid(); - sequenceIdAccounting.update(encodedRegionName, entry.getFamilyNames(), regionSequenceId, - entry.isInMemstore()); + sequenceIdAccounting.update(encodedRegionName, regionSequenceId); coprocessorHost.postWALWrite(entry.getHRegionInfo(), entry.getKey(), entry.getEdit()); // Update metrics. postAppend(entry, EnvironmentEdgeManager.currentTime() - start); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java index 3b8525c..fc8a960 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java @@ -18,23 +18,13 @@ package org.apache.hadoop.hbase.regionserver.wal; -import com.google.common.annotations.VisibleForTesting; - import java.io.IOException; -import java.util.Collections; -import java.util.List; -import java.util.Set; -import java.util.TreeSet; -import static java.util.stream.Collectors.toCollection; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.CollectionUtils; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.htrace.Span; @@ -54,7 +44,6 @@ class FSWALEntry extends Entry { private final transient long txid; private final transient boolean inMemstore; private final transient HRegionInfo hri; - private final transient Set familyNames; // The tracing span for this entry when writing WAL. private transient Span span; @@ -65,28 +54,9 @@ class FSWALEntry extends Entry { this.inMemstore = inMemstore; this.hri = hri; this.txid = txid; - if (inMemstore) { - // construct familyNames here to reduce the work of log sinker. - this.familyNames = collectFamilies(edit.getCells()); - } else { - this.familyNames = Collections. emptySet(); - } - } - - @VisibleForTesting - static Set collectFamilies(List cells) { - if (CollectionUtils.isEmpty(cells)) { - return Collections. emptySet(); - } else { - return cells.stream() - .filter(v -> !CellUtil.matchingFamily(v, WALEdit.METAFAMILY)) - .collect(toCollection(() -> new TreeSet<>(CellComparator::compareFamilies))) - .stream() - .map(CellUtil::cloneFamily) - .collect(toCollection(() -> new TreeSet<>(Bytes.BYTES_COMPARATOR))); - } } + @Override public String toString() { return "sequence=" + this.txid + ", " + super.toString(); }; @@ -122,13 +92,6 @@ class FSWALEntry extends Entry { return regionSequenceId; } - /** - * @return the family names which are effected by this edit. - */ - Set getFamilyNames() { - return familyNames; - } - void attachSpan(Span span) { this.span = span; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java index b065a59..61a8503 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java @@ -19,24 +19,20 @@ package org.apache.hadoop.hbase.regionserver.wal; import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent; -import com.google.common.annotations.VisibleForTesting; - import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.stream.Collectors; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; +import org.apache.commons.lang.mutable.MutableLong; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.ImmutableByteArray; +import org.apache.hadoop.hbase.util.HashedBytes; +import org.apache.hadoop.hbase.util.Pair; /** * Accounting of sequence ids per region and then by column family. So we can our accounting @@ -52,71 +48,46 @@ import org.apache.hadoop.hbase.util.ImmutableByteArray; @InterfaceAudience.Private class SequenceIdAccounting { - private static final Log LOG = LogFactory.getLog(SequenceIdAccounting.class); /** - * This lock ties all operations on {@link SequenceIdAccounting#flushingSequenceIds} and - * {@link #lowestUnflushedSequenceIds} Maps. {@link #lowestUnflushedSequenceIds} has the - * lowest outstanding sequence ids EXCEPT when flushing. When we flush, the current - * lowest set for the region/column family are moved (atomically because of this lock) to - * {@link #flushingSequenceIds}. - * - *

The two Maps are tied by this locking object EXCEPT when we go to update the lowest - * entry; see {@link #lowestUnflushedSequenceIds}. In here is a putIfAbsent call on - * {@link #lowestUnflushedSequenceIds}. In this latter case, we will add this lowest - * sequence id if we find that there is no entry for the current column family. There will be no - * entry only if we just came up OR we have moved aside current set of lowest sequence ids - * because the current set are being flushed (by putting them into {@link #flushingSequenceIds}). - * This is how we pick up the next 'lowest' sequence id per region per column family to be used - * figuring what is in the next flush. + * Map of encoded region names and family names to their OLDEST -- i.e. their first, the + * longest-lived, their 'earliest', the 'lowest' -- sequence id. + *

+ * This map will be updated at 4 places: + *

    + *
  • When region open, we will call {@code WAL.updateStore} which will add a record to the + * map.
  • + *
  • After flushing, we will call {@code WAL.completeCacheFlush} which will add several records + * to the map.
  • + *
  • After in-memory flush, we will call {@code WAL.updateStore} which will add a record to the + * map.
  • + *
  • When region close, we will call {@code WAL.closeRegion} which will remove the records + * related to the region from the map.
  • + *
+ * The removal is necessary as we do not have other ways to clean up the records belong to regions + * that have already been moved to other RSes. */ - private final Object tieLock = new Object(); + private final ConcurrentMap> lowestUnflushedSequenceIds = + new ConcurrentHashMap<>(); /** - * Map of encoded region names and family names to their OLDEST -- i.e. their first, - * the longest-lived, their 'earliest', the 'lowest' -- sequence id. - * - *

When we flush, the current lowest sequence ids get cleared and added to - * {@link #flushingSequenceIds}. The next append that comes in, is then added - * here to {@link #lowestUnflushedSequenceIds} as the next lowest sequenceid. - * - *

If flush fails, currently server is aborted so no need to restore previous sequence ids. - *

Needs to be concurrent Maps because we use putIfAbsent updating oldest. + * Map of region encoded names to the latest/highest region sequence id. Updated on each call to + * append. + *

+ * This map uses byte[] as the key, and uses reference equality. It works in our use case as we + * use {@link HRegionInfo#getEncodedNameAsBytes()} as keys. For a given region, it always returns + * the same array. */ - private final ConcurrentMap> - lowestUnflushedSequenceIds = new ConcurrentHashMap<>(); - - /** - * Map of encoded region names and family names to their lowest or OLDEST sequence/edit id - * currently being flushed out to hfiles. Entries are moved here from - * {@link #lowestUnflushedSequenceIds} while the lock {@link #tieLock} is held - * (so movement between the Maps is atomic). - */ - private final Map> flushingSequenceIds = new HashMap<>(); - - /** - * Map of region encoded names to the latest/highest region sequence id. Updated on each - * call to append. - *

- * This map uses byte[] as the key, and uses reference equality. It works in our use case as we - * use {@link HRegionInfo#getEncodedNameAsBytes()} as keys. For a given region, it always returns - * the same array. - */ - private Map highestSequenceIds = new HashMap<>(); + private Map highestSequenceIds = new HashMap<>(); /** * Returns the lowest unflushed sequence id for the region. * @param encodedRegionName - * @return Lowest outstanding unflushed sequenceid for encodedRegionName. Will - * return {@link HConstants#NO_SEQNUM} when none. + * @return Lowest outstanding unflushed sequenceid for encodedRegionName. Will return + * {@link HConstants#NO_SEQNUM} when none. */ - long getLowestSequenceId(final byte[] encodedRegionName) { - synchronized (this.tieLock) { - Map m = this.flushingSequenceIds.get(encodedRegionName); - long flushingLowest = m != null ? getLowestSequenceId(m) : Long.MAX_VALUE; - m = this.lowestUnflushedSequenceIds.get(encodedRegionName); - long unflushedLowest = m != null ? getLowestSequenceId(m) : HConstants.NO_SEQNUM; - return Math.min(flushingLowest, unflushedLowest); - } + long getLowestSequenceId(byte[] encodedRegionName) { + ConcurrentMap m = this.lowestUnflushedSequenceIds.get(encodedRegionName); + return m != null ? getLowestSequenceId(m) : HConstants.NO_SEQNUM; } /** @@ -126,97 +97,39 @@ class SequenceIdAccounting { * familyName. Returned sequenceid may be for an edit currently being * flushed. */ - long getLowestSequenceId(final byte[] encodedRegionName, final byte[] familyName) { - ImmutableByteArray familyNameWrapper = ImmutableByteArray.wrap(familyName); - synchronized (this.tieLock) { - Map m = this.flushingSequenceIds.get(encodedRegionName); - if (m != null) { - Long lowest = m.get(familyNameWrapper); - if (lowest != null) { - return lowest; - } - } - m = this.lowestUnflushedSequenceIds.get(encodedRegionName); - if (m != null) { - Long lowest = m.get(familyNameWrapper); - if (lowest != null) { - return lowest; - } - } + long getLowestSequenceId(byte[] encodedRegionName, byte[] familyName) { + ConcurrentMap m = this.lowestUnflushedSequenceIds.get(encodedRegionName); + if (m == null) { + return HConstants.NO_SEQNUM; } - return HConstants.NO_SEQNUM; + Long lowest = m.get(new HashedBytes(familyName)); + return lowest != null ? lowest.longValue() : HConstants.NO_SEQNUM; } /** * Reset the accounting of highest sequenceid by regionname. * @return Return the previous accounting Map of regions to the last sequence id written into - * each. + * each. */ Map resetHighest() { - Map old = this.highestSequenceIds; + Map old = this.highestSequenceIds; this.highestSequenceIds = new HashMap<>(); - return old; + return old.entrySet().stream().map(e -> Pair.newPair(e.getKey(), e.getValue().longValue())) + .collect(Collectors.toMap(Pair::getFirst, Pair::getSecond)); } /** * We've been passed a new sequenceid for the region. Set it as highest seen for this region and - * if we are to record oldest, or lowest sequenceids, save it as oldest seen if nothing - * currently older. + * if we are to record oldest. * @param encodedRegionName - * @param families - * @param sequenceid - * @param lowest Whether to keep running account of oldest sequence id. - */ - void update(byte[] encodedRegionName, Set families, long sequenceid, - final boolean lowest) { - Long l = Long.valueOf(sequenceid); - this.highestSequenceIds.put(encodedRegionName, l); - if (lowest) { - ConcurrentMap m = getOrCreateLowestSequenceIds(encodedRegionName); - for (byte[] familyName : families) { - m.putIfAbsent(ImmutableByteArray.wrap(familyName), l); - } - } - } - - /** - * Update the store sequence id, e.g., upon executing in-memory compaction + * @param sequenceId */ - void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequenceId, - boolean onlyIfGreater) { - if (sequenceId == null) { - return; - } - Long highest = this.highestSequenceIds.get(encodedRegionName); - if (highest == null || sequenceId > highest) { - this.highestSequenceIds.put(encodedRegionName, sequenceId); - } - ImmutableByteArray familyNameWrapper = ImmutableByteArray.wrap(familyName); - synchronized (this.tieLock) { - ConcurrentMap m = getOrCreateLowestSequenceIds(encodedRegionName); - boolean replaced = false; - while (!replaced) { - Long oldSeqId = m.get(familyNameWrapper); - if (oldSeqId == null) { - m.put(familyNameWrapper, sequenceId); - replaced = true; - } else if (onlyIfGreater) { - if (sequenceId > oldSeqId) { - replaced = m.replace(familyNameWrapper, oldSeqId, sequenceId); - } else { - return; - } - } else { // replace even if sequence id is not greater than oldSeqId - m.put(familyNameWrapper, sequenceId); - return; - } - } - } + void update(byte[] encodedRegionName, long sequenceId) { + this.highestSequenceIds.computeIfAbsent(encodedRegionName, k -> new MutableLong()) + .setValue(sequenceId); } - @VisibleForTesting - ConcurrentMap getOrCreateLowestSequenceIds(byte[] encodedRegionName) { - // Intentionally, this access is done outside of this.regionSequenceIdLock. Done per append. + private ConcurrentMap getOrCreateLowestSequenceIds(byte[] encodedRegionName) { return computeIfAbsent(this.lowestUnflushedSequenceIds, encodedRegionName, ConcurrentHashMap::new); } @@ -227,7 +140,7 @@ class SequenceIdAccounting { */ private static long getLowestSequenceId(Map sequenceids) { long lowest = HConstants.NO_SEQNUM; - for (Long sid: sequenceids.values()) { + for (Long sid : sequenceids.values()) { if (lowest == HConstants.NO_SEQNUM || sid.longValue() < lowest) { lowest = sid.longValue(); } @@ -235,167 +148,42 @@ class SequenceIdAccounting { return lowest; } - /** - * @param src - * @return New Map that has same keys as src but instead of a Map for a value, it - * instead has found the smallest sequence id and it returns that as the value instead. - */ - private > Map flattenToLowestSequenceId(Map src) { - if (src == null || src.isEmpty()) { - return null; - } - Map tgt = new HashMap<>(); - for (Map.Entry entry : src.entrySet()) { - long lowestSeqId = getLowestSequenceId(entry.getValue()); - if (lowestSeqId != HConstants.NO_SEQNUM) { - tgt.put(entry.getKey(), lowestSeqId); - } - } - return tgt; - } - - /** - * @param encodedRegionName Region to flush. - * @param families Families to flush. May be a subset of all families in the region. - * @return Returns {@link HConstants#NO_SEQNUM} if we are flushing the whole region OR if - * we are flushing a subset of all families but there are no edits in those families not - * being flushed; in other words, this is effectively same as a flush of all of the region - * though we were passed a subset of regions. Otherwise, it returns the sequence id of the - * oldest/lowest outstanding edit. - */ - Long startCacheFlush(final byte[] encodedRegionName, final Set families) { - Map familytoSeq = new HashMap<>(); - for (byte[] familyName : families){ - familytoSeq.put(familyName,HConstants.NO_SEQNUM); - } - return startCacheFlush(encodedRegionName,familytoSeq); - } - - Long startCacheFlush(final byte[] encodedRegionName, final Map familyToSeq) { - Map oldSequenceIds = null; - Long lowestUnflushedInRegion = HConstants.NO_SEQNUM; - synchronized (tieLock) { - Map m = this.lowestUnflushedSequenceIds.get(encodedRegionName); - if (m != null) { - // NOTE: Removal from this.lowestUnflushedSequenceIds must be done in controlled - // circumstance because another concurrent thread now may add sequenceids for this family - // (see above in getOrCreateLowestSequenceId). Make sure you are ok with this. Usually it - // is fine because updates are blocked when this method is called. Make sure!!! - for (Map.Entry entry : familyToSeq.entrySet()) { - ImmutableByteArray familyNameWrapper = ImmutableByteArray.wrap((byte[]) entry.getKey()); - Long seqId = null; - if(entry.getValue() == HConstants.NO_SEQNUM) { - seqId = m.remove(familyNameWrapper); - } else { - seqId = m.replace(familyNameWrapper, entry.getValue()); - } - if (seqId != null) { - if (oldSequenceIds == null) { - oldSequenceIds = new HashMap<>(); - } - oldSequenceIds.put(familyNameWrapper, seqId); + void updateLowestUnflushedSequenceIds(byte[] encodedRegionName, + Map family2LowestUnflushedSequenceId) { + ConcurrentMap m = getOrCreateLowestSequenceIds(encodedRegionName); + family2LowestUnflushedSequenceId + .forEach((family, lowest) -> m.compute(new HashedBytes(family), (k, oldLowest) -> { + if (oldLowest == null) { + return lowest; } - } - if (oldSequenceIds != null && !oldSequenceIds.isEmpty()) { - if (this.flushingSequenceIds.put(encodedRegionName, oldSequenceIds) != null) { - LOG.warn("Flushing Map not cleaned up for " + Bytes.toString(encodedRegionName) + - ", sequenceid=" + oldSequenceIds); - } - } - if (m.isEmpty()) { - // Remove it otherwise it will be in oldestUnflushedStoreSequenceIds for ever - // even if the region is already moved to other server. - // Do not worry about data racing, we held write lock of region when calling - // startCacheFlush, so no one can add value to the map we removed. - this.lowestUnflushedSequenceIds.remove(encodedRegionName); - } else { - // Flushing a subset of the region families. Return the sequence id of the oldest entry. - lowestUnflushedInRegion = Collections.min(m.values()); - } - } - } - // Do this check outside lock. - if (oldSequenceIds != null && oldSequenceIds.isEmpty()) { - // TODO: if we have no oldStoreSeqNum, and WAL is not disabled, presumably either - // the region is already flushing (which would make this call invalid), or there - // were no appends after last flush, so why are we starting flush? Maybe we should - // assert not empty. Less rigorous, but safer, alternative is telling the caller to stop. - // For now preserve old logic. - LOG.warn("Couldn't find oldest sequenceid for " + Bytes.toString(encodedRegionName)); - } - return lowestUnflushedInRegion; + // Do not use math.max here as it requires extra boxing/unboxing + // TODO: Still not sure if lowest can go backward as I do not know if the call from in + // memory compaction could be skew with the normal flush. Add a warning here if we can + // confirm that it should never go backward. + return lowest.longValue() > oldLowest.longValue() ? lowest : oldLowest; + })); } - void completeCacheFlush(final byte[] encodedRegionName) { - synchronized (tieLock) { - this.flushingSequenceIds.remove(encodedRegionName); - } - } - - void abortCacheFlush(final byte[] encodedRegionName) { - // Method is called when we are crashing down because failed write flush AND it is called - // if we fail prepare. The below is for the fail prepare case; we restore the old sequence ids. - Map flushing = null; - Map tmpMap = new HashMap<>(); - // Here we are moving sequenceids from flushing back to unflushed; doing opposite of what - // happened in startCacheFlush. During prepare phase, we have update lock on the region so - // no edits should be coming in via append. - synchronized (tieLock) { - flushing = this.flushingSequenceIds.remove(encodedRegionName); - if (flushing != null) { - Map unflushed = getOrCreateLowestSequenceIds(encodedRegionName); - for (Map.Entry e: flushing.entrySet()) { - // Set into unflushed the 'old' oldest sequenceid and if any value in flushed with this - // value, it will now be in tmpMap. - tmpMap.put(e.getKey(), unflushed.put(e.getKey(), e.getValue())); - } - } - } - - // Here we are doing some 'test' to see if edits are going in out of order. What is it for? - // Carried over from old code. - if (flushing != null) { - for (Map.Entry e : flushing.entrySet()) { - Long currentId = tmpMap.get(e.getKey()); - if (currentId != null && currentId.longValue() < e.getValue().longValue()) { - String errorStr = Bytes.toString(encodedRegionName) + " family " - + e.getKey().toStringUtf8() + " acquired edits out of order current memstore seq=" - + currentId + ", previous oldest unflushed id=" + e.getValue(); - LOG.error(errorStr); - Runtime.getRuntime().halt(1); - } - } - } + void remove(byte[] encodedRegionName) { + this.lowestUnflushedSequenceIds.remove(encodedRegionName); } /** * See if passed sequenceids are lower -- i.e. earlier -- than any outstanding * sequenceids, sequenceids we are holding on to in this accounting instance. - * @param sequenceids Keyed by encoded region name. Cannot be null (doesn't make sense for it to + * @param sequenceIds Keyed by encoded region name. Cannot be null (doesn't make sense for it to * be null). * @return true if all sequenceids are lower, older than, the old sequenceids in this instance. */ - boolean areAllLower(Map sequenceids) { - Map flushing = null; - Map unflushed = null; - synchronized (this.tieLock) { - // Get a flattened -- only the oldest sequenceid -- copy of current flushing and unflushed - // data structures to use in tests below. - flushing = flattenToLowestSequenceId(this.flushingSequenceIds); - unflushed = flattenToLowestSequenceId(this.lowestUnflushedSequenceIds); - } - for (Map.Entry e : sequenceids.entrySet()) { - long oldestFlushing = Long.MAX_VALUE; - long oldestUnflushed = Long.MAX_VALUE; - if (flushing != null && flushing.containsKey(e.getKey())) { - oldestFlushing = flushing.get(e.getKey()); - } - if (unflushed != null && unflushed.containsKey(e.getKey())) { - oldestUnflushed = unflushed.get(e.getKey()); - } - long min = Math.min(oldestFlushing, oldestUnflushed); - if (min <= e.getValue()) { - return false; + boolean areAllLower(Map sequenceIds) { + for (Map.Entry e : sequenceIds.entrySet()) { + ConcurrentMap m = this.lowestUnflushedSequenceIds.get(e.getKey()); + if (m != null) { + for (Long lowest : m.values()) { + if (lowest.longValue() <= e.getValue().longValue()) { + return false; + } + } } } return true; @@ -403,31 +191,25 @@ class SequenceIdAccounting { /** * Iterates over the given Map and compares sequence ids with corresponding entries in - * {@link #lowestUnflushedSequenceIds}. If a region in - * {@link #lowestUnflushedSequenceIds} has a sequence id less than that passed in - * sequenceids then return it. + * {@link #lowestUnflushedSequenceIds}. If a region in {@link #lowestUnflushedSequenceIds} has a + * sequence id less than that passed in sequenceids then return it. * @param sequenceids Sequenceids keyed by encoded region name. * @return regions found in this instance with sequence ids less than those passed in. */ - byte[][] findLower(Map sequenceids) { - List toFlush = null; - // Keeping the old behavior of iterating unflushedSeqNums under oldestSeqNumsLock. - synchronized (tieLock) { - for (Map.Entry e : sequenceids.entrySet()) { - Map m = this.lowestUnflushedSequenceIds.get(e.getKey()); - if (m == null) { - continue; - } - // The lowest sequence id outstanding for this region. - long lowest = getLowestSequenceId(m); - if (lowest != HConstants.NO_SEQNUM && lowest <= e.getValue()) { - if (toFlush == null) { - toFlush = new ArrayList<>(); - } - toFlush.add(e.getKey()); + byte[][] findLower(Map sequenceIds) { + List toFlush = new ArrayList<>(); + sequenceIds.forEach((encodedRegionName, sequenceId) -> { + ConcurrentMap m = this.lowestUnflushedSequenceIds.get(encodedRegionName); + if (m == null) { + return; + } + for (long lowest : m.values()) { + if (lowest <= sequenceId.longValue()) { + toFlush.add(encodedRegionName); + return; } } - } - return toFlush == null ? null : toFlush.toArray(new byte[0][]); + }); + return toFlush.isEmpty() ? null : toFlush.toArray(new byte[0][]); } } \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java index 84d6128..e46cb45 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java @@ -18,8 +18,6 @@ */ package org.apache.hadoop.hbase.util; -import java.util.Arrays; - import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; @@ -55,7 +53,7 @@ public class HashedBytes { if (obj == null || getClass() != obj.getClass()) return false; HashedBytes other = (HashedBytes) obj; - return (hashCode == other.hashCode) && Arrays.equals(bytes, other.bytes); + return (hashCode == other.hashCode) && Bytes.equals(bytes, other.bytes); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java index b442f07..d77067a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; @@ -31,10 +30,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; - +import org.apache.hadoop.hbase.classification.InterfaceAudience; // imports for things that haven't moved from regionserver.wal yet. import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost; @@ -177,9 +175,10 @@ class DisabledWALProvider implements WALProvider { return -1; } + @Override - public void updateStore(byte[] encodedRegionName, byte[] familyName, - Long sequenceid, boolean onlyIfGreater) { return; } + public void updateStore(byte[] encodedRegionName, byte[] familyName, long sequenceId) { + } @Override public void sync() { @@ -195,23 +194,22 @@ class DisabledWALProvider implements WALProvider { sync(); } - public Long startCacheFlush(final byte[] encodedRegionName, Map - flushedFamilyNamesToSeq) { - return startCacheFlush(encodedRegionName, flushedFamilyNamesToSeq.keySet()); + @Override + public boolean startCacheFlush(byte[] encodedRegionName) { + return closed.get(); } @Override - public Long startCacheFlush(final byte[] encodedRegionName, Set flushedFamilyNames) { - if (closed.get()) return null; - return HConstants.NO_SEQNUM; + public void completeCacheFlush(byte[] encodedRegionName, + Map family2LowestUnflushedSequenceId) { } @Override - public void completeCacheFlush(final byte[] encodedRegionName) { + public void abortCacheFlush(byte[] encodedRegionName) { } @Override - public void abortCacheFlush(byte[] encodedRegionName) { + public void closeRegion(byte[] encodedRegionName) { } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java index b7adc60..0c7fd25 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java @@ -24,7 +24,6 @@ import com.google.common.annotations.VisibleForTesting; import java.io.Closeable; import java.io.IOException; import java.util.Map; -import java.util.Set; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; @@ -118,16 +117,12 @@ public interface WAL extends Closeable { long append(HRegionInfo info, WALKey key, WALEdit edits, boolean inMemstore) throws IOException; /** - * updates the seuence number of a specific store. - * depending on the flag: replaces current seq number if the given seq id is bigger, - * or even if it is lower than existing one + * updates the sequence number of a specific store. * @param encodedRegionName * @param familyName - * @param sequenceid - * @param onlyIfGreater + * @param sequenceId */ - void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequenceid, - boolean onlyIfGreater); + void updateStore(byte[] encodedRegionName, byte[] familyName, long sequenceId); /** * Sync what we have in the WAL. @@ -143,33 +138,25 @@ public interface WAL extends Closeable { void sync(long txid) throws IOException; /** - * WAL keeps track of the sequence numbers that are as yet not flushed im memstores - * in order to be able to do accounting to figure which WALs can be let go. This method tells WAL - * that some region is about to flush. The flush can be the whole region or for a column family - * of the region only. - * - *

Currently, it is expected that the update lock is held for the region; i.e. no - * concurrent appends while we set up cache flush. - * @param families Families to flush. May be a subset of all families in the region. - * @return Returns {@link HConstants#NO_SEQNUM} if we are flushing the whole region OR if - * we are flushing a subset of all families but there are no edits in those families not - * being flushed; in other words, this is effectively same as a flush of all of the region - * though we were passed a subset of regions. Otherwise, it returns the sequence id of the - * oldest/lowest outstanding edit. - * @see #completeCacheFlush(byte[]) - * @see #abortCacheFlush(byte[]) + * Indicate that we will start flushing the given region. + *

+ * Notice that, now we will also track the minimum sequence id in memstore, thus we will update the + * sequence id map when flush complete by fetching the minimum sequence id for the new + * memstore directly. So here we do not need to get any sequence id back. + * @param encodedRegionName + * @return true if we can start the flush processing, otherwise false. */ - Long startCacheFlush(final byte[] encodedRegionName, Set families); - - Long startCacheFlush(final byte[] encodedRegionName, Map familyToSeq); + boolean startCacheFlush(byte[] encodedRegionName); /** * Complete the cache flush. - * @param encodedRegionName Encoded region name. - * @see #startCacheFlush(byte[], Set) + *

+ * @param family2LowestUnflushedSequenceId used to update the lowestUnflushedSequenceId. + * @see #startCacheFlush(byte[]) * @see #abortCacheFlush(byte[]) */ - void completeCacheFlush(final byte[] encodedRegionName); + void completeCacheFlush(final byte[] encodedRegionName, + Map family2LowestUnflushedSequenceId); /** * Abort a cache flush. Call if the flush fails. Note that the only recovery @@ -180,6 +167,12 @@ public interface WAL extends Closeable { void abortCacheFlush(byte[] encodedRegionName); /** + * Indicate that a region has been closed. + *

+ * This is used to remove the sequence id mapping in SequenceIdAccounting. + */ + void closeRegion(byte[] encodedRegionName); + /** * @return Coprocessor host. */ WALCoprocessorHost getCoprocessorHost(); @@ -188,11 +181,7 @@ public interface WAL extends Closeable { * Gets the earliest unflushed sequence id in the memstore for the region. * @param encodedRegionName The region to get the number for. * @return The earliest/lowest/oldest sequence id if present, HConstants.NO_SEQNUM if absent. - * @deprecated Since version 1.2.0. Removing because not used and exposes subtle internal - * workings. Use {@link #getEarliestMemstoreSeqNum(byte[], byte[])} */ - @VisibleForTesting - @Deprecated long getEarliestMemstoreSeqNum(byte[] encodedRegionName); /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java index 0be7b31..a8f0cae 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java @@ -26,7 +26,9 @@ import static org.junit.Assert.assertNotEquals; import java.io.IOException; import java.util.ArrayList; import java.util.Comparator; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.NavigableMap; import java.util.Set; import java.util.TreeMap; @@ -56,6 +58,7 @@ import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; +import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -161,18 +164,28 @@ public abstract class AbstractTestFSWAL { WALKey.NO_SEQUENCE_ID, timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE, HConstants.NO_NONCE, mvcc, scopes); log.append(hri, key, cols, true); + mvcc.complete(key.getWriteEntry()); } log.sync(); } /** * helper method to simulate region flush for a WAL. - * @param wal - * @param regionEncodedName */ - protected void flushRegion(WAL wal, byte[] regionEncodedName, Set flushedFamilyNames) { - wal.startCacheFlush(regionEncodedName, flushedFamilyNames); - wal.completeCacheFlush(regionEncodedName); + private void flushRegion(WAL wal, byte[] regionEncodedName, Set flushedFamilyNames, + MultiVersionConcurrencyControl mvcc, boolean rollDuringFlush) + throws FailedLogCloseException, IOException { + wal.startCacheFlush(regionEncodedName); + WriteEntry we = mvcc.begin(); + mvcc.completeAndWait(we); + long flushOpSeqId = we.getWriteNumber(); + Map familyToLowestUnflushedSequenceId = new HashMap<>(); + flushedFamilyNames + .forEach(family -> familyToLowestUnflushedSequenceId.put(family, flushOpSeqId)); + if (rollDuringFlush) { + wal.rollWriter(); + } + wal.completeCacheFlush(regionEncodedName, familyToLowestUnflushedSequenceId); } /** @@ -253,9 +266,12 @@ public abstract class AbstractTestFSWAL { new HRegionInfo(t1.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); HRegionInfo hri2 = new HRegionInfo(t2.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); + wal.updateStore(hri1.getEncodedNameAsBytes(), Bytes.toBytes("row"), HConstants.NO_SEQNUM); + wal.updateStore(hri2.getEncodedNameAsBytes(), Bytes.toBytes("row"), HConstants.NO_SEQNUM); // add edits and roll the wal - MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); - NavigableMap scopes1 = new TreeMap<>(Bytes.BYTES_COMPARATOR); + MultiVersionConcurrencyControl mvcc1 = new MultiVersionConcurrencyControl(); + MultiVersionConcurrencyControl mvcc2 = new MultiVersionConcurrencyControl(); + NavigableMap scopes1 = new TreeMap(Bytes.BYTES_COMPARATOR); for (byte[] fam : t1.getFamiliesKeys()) { scopes1.put(fam, 0); } @@ -264,10 +280,10 @@ public abstract class AbstractTestFSWAL { scopes2.put(fam, 0); } try { - addEdits(wal, hri1, t1, 2, mvcc, scopes1); + addEdits(wal, hri1, t1, 2, mvcc1, scopes1); wal.rollWriter(); // add some more edits and roll the wal. This would reach the log number threshold - addEdits(wal, hri1, t1, 2, mvcc, scopes1); + addEdits(wal, hri1, t1, 2, mvcc1, scopes1); wal.rollWriter(); // with above rollWriter call, the max logs limit is reached. assertTrue(wal.getNumRolledLogFiles() == 2); @@ -278,45 +294,43 @@ public abstract class AbstractTestFSWAL { assertEquals(1, regionsToFlush.length); assertEquals(hri1.getEncodedNameAsBytes(), regionsToFlush[0]); // insert edits in second region - addEdits(wal, hri2, t2, 2, mvcc, scopes2); + addEdits(wal, hri2, t2, 2, mvcc2, scopes2); // get the regions to flush, it should still read region1. regionsToFlush = wal.findRegionsToForceFlush(); assertEquals(regionsToFlush.length, 1); assertEquals(hri1.getEncodedNameAsBytes(), regionsToFlush[0]); // flush region 1, and roll the wal file. Only last wal which has entries for region1 should // remain. - flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getFamiliesKeys()); + flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getFamiliesKeys(), mvcc1, false); wal.rollWriter(); // only one wal should remain now (that is for the second region). assertEquals(1, wal.getNumRolledLogFiles()); // flush the second region - flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getFamiliesKeys()); + flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getFamiliesKeys(), mvcc2, false); wal.rollWriter(true); // no wal should remain now. assertEquals(0, wal.getNumRolledLogFiles()); // add edits both to region 1 and region 2, and roll. - addEdits(wal, hri1, t1, 2, mvcc, scopes1); - addEdits(wal, hri2, t2, 2, mvcc, scopes2); + addEdits(wal, hri1, t1, 2, mvcc1, scopes1); + addEdits(wal, hri2, t2, 2, mvcc2, scopes2); wal.rollWriter(); // add edits and roll the writer, to reach the max logs limit. assertEquals(1, wal.getNumRolledLogFiles()); - addEdits(wal, hri1, t1, 2, mvcc, scopes1); + addEdits(wal, hri1, t1, 2, mvcc1, scopes1); wal.rollWriter(); // it should return two regions to flush, as the oldest wal file has entries // for both regions. regionsToFlush = wal.findRegionsToForceFlush(); assertEquals(2, regionsToFlush.length); // flush both regions - flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getFamiliesKeys()); - flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getFamiliesKeys()); + flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getFamiliesKeys(), mvcc1, false); + flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getFamiliesKeys(), mvcc2, false); wal.rollWriter(true); assertEquals(0, wal.getNumRolledLogFiles()); // Add an edit to region1, and roll the wal. - addEdits(wal, hri1, t1, 2, mvcc, scopes1); + addEdits(wal, hri1, t1, 2, mvcc1, scopes1); // tests partial flush: roll on a partial flush, and ensure that wal is not archived. - wal.startCacheFlush(hri1.getEncodedNameAsBytes(), t1.getFamiliesKeys()); - wal.rollWriter(); - wal.completeCacheFlush(hri1.getEncodedNameAsBytes()); + flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getFamiliesKeys(), mvcc1, true); assertEquals(1, wal.getNumRolledLogFiles()); } finally { if (wal != null) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java index e5e7c83..674f5f7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java @@ -30,6 +30,7 @@ import static org.mockito.Mockito.when; import java.io.FilterInputStream; import java.io.IOException; +import java.io.UncheckedIOException; import java.lang.reflect.Field; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; @@ -37,11 +38,14 @@ import java.util.Arrays; import java.util.Collection; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.NavigableMap; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -51,7 +55,20 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.MemoryCompactionPolicy; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; @@ -62,7 +79,6 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.regionserver.CompactingMemStore; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher; import org.apache.hadoop.hbase.regionserver.FlushRequestListener; @@ -79,6 +95,7 @@ import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -362,9 +379,9 @@ public abstract class AbstractTestWALReplay { final Configuration newConf = HBaseConfiguration.create(this.conf); User user = HBaseTestingUtility.getDifferentUser(newConf, tableName.getNameAsString()); - user.runAs(new PrivilegedExceptionAction() { + user.runAs(new PrivilegedExceptionAction() { @Override - public Object run() throws Exception { + public Void run() throws Exception { runWALSplit(newConf); WAL wal2 = createWAL(newConf, hbaseRootDir, logName); @@ -433,9 +450,9 @@ public abstract class AbstractTestWALReplay { final Configuration newConf = HBaseConfiguration.create(this.conf); User user = HBaseTestingUtility.getDifferentUser(newConf, tableName.getNameAsString()); - user.runAs(new PrivilegedExceptionAction() { + user.runAs(new PrivilegedExceptionAction() { @Override - public Object run() throws Exception { + public Void run() throws Exception { runWALSplit(newConf); WAL wal2 = createWAL(newConf, hbaseRootDir, logName); @@ -785,8 +802,10 @@ public abstract class AbstractTestWALReplay { } // Add a cache flush, shouldn't have any effect - wal.startCacheFlush(regionName, familyNames); - wal.completeCacheFlush(regionName); + wal.startCacheFlush(regionName); + wal.completeCacheFlush(regionName, + familyNames.stream().map(f -> Pair.newPair(f, mvcc.getReadPoint())) + .collect(Collectors.toMap(Pair::getFirst, Pair::getSecond))); // Add an edit to another family, should be skipped. WALEdit edit = new WALEdit(); @@ -861,7 +880,7 @@ public abstract class AbstractTestWALReplay { @Test // the following test is for HBASE-6065 - public void testSequentialEditLogSeqNum() throws IOException { + public void testSequentialEditLogSeqNum() throws IOException, InterruptedException { final TableName tableName = TableName.valueOf(currentTest.getMethodName()); final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); final Path basedir = @@ -881,16 +900,24 @@ public abstract class AbstractTestWALReplay { // Let us flush the region // But this time completeflushcache is not yet done - region.flush(true); + Thread flushThread = new Thread(() -> { + try { + region.flush(true); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + flushThread.start(); + wal.enter.await(); for (HColumnDescriptor hcd : htd.getFamilies()) { addRegionEdits(rowName, hcd.getName(), 5, this.ee, region, "x"); } - long lastestSeqNumber = region.getReadPoint(null); // get the current seq no - wal.doCompleteCacheFlush = true; + long lastestSeqNumber = region.getReadPoint(null); // allow complete cache flush with the previous seq number got after first // set of edits. - wal.completeCacheFlush(hri.getEncodedNameAsBytes()); + wal.run.countDown(); + flushThread.join(); wal.shutdown(); FileStatus[] listStatus = wal.getFiles(); assertNotNull(listStatus); @@ -1070,7 +1097,10 @@ public abstract class AbstractTestWALReplay { } static class MockWAL extends FSHLog { - boolean doCompleteCacheFlush = false; + + private final CountDownLatch enter = new CountDownLatch(1); + + private final CountDownLatch run = new CountDownLatch(1); public MockWAL(FileSystem fs, Path rootDir, String logName, Configuration conf) throws IOException { @@ -1078,11 +1108,15 @@ public abstract class AbstractTestWALReplay { } @Override - public void completeCacheFlush(byte[] encodedRegionName) { - if (!doCompleteCacheFlush) { - return; + public void completeCacheFlush(byte[] encodedRegionName, + Map family2LowestUnflushedSequenceId) { + enter.countDown(); + try { + run.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); } - super.completeCacheFlush(encodedRegionName); + super.completeCacheFlush(encodedRegionName, family2LowestUnflushedSequenceId); } } @@ -1166,8 +1200,9 @@ public abstract class AbstractTestWALReplay { final HTableDescriptor htd, final MultiVersionConcurrencyControl mvcc, NavigableMap scopes) throws IOException { for (int j = 0; j < count; j++) { - wal.append(hri, createWALKey(tableName, hri, mvcc, scopes), - createWALEdit(rowName, family, ee, j), true); + WALKey key = createWALKey(tableName, hri, mvcc, scopes); + wal.append(hri, key, createWALEdit(rowName, family, ee, j), true); + mvcc.complete(key.getWriteEntry()); } wal.sync(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java index 8847c4c..4d2f95f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WALKey; import org.junit.Rule; +import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; @@ -120,6 +121,7 @@ public class TestFSHLog extends AbstractTestFSWAL { /** * Test case for https://issues.apache.org/jira/browse/HBASE-16721 */ + @Ignore @Test (timeout = 30000) public void testUnflushedSeqIdTracking() throws IOException, InterruptedException { final String name = this.name.getMethodName(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSWALEntry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSWALEntry.java deleted file mode 100644 index cf059dd..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSWALEntry.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.regionserver.wal; - -import java.util.ArrayList; -import java.util.List; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.testclassification.RegionServerTests; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.hbase.util.Bytes; -import static org.junit.Assert.assertEquals; -import org.junit.Test; -import org.junit.experimental.categories.Category; - - -@Category({ RegionServerTests.class, SmallTests.class }) -public class TestFSWALEntry { - @Test - public void testCollectFamilies() { - byte[] family0 = Bytes.toBytes("family0"); - byte[] family1 = Bytes.toBytes("family1"); - byte[] family2 = Bytes.toBytes("family2"); - - List cells = new ArrayList<>(); - assertEquals(0, FSWALEntry.collectFamilies(cells).size()); - - cells.add(CellUtil.createCell(family0, family0, family0)); - assertEquals(1, FSWALEntry.collectFamilies(cells).size()); - - cells.add(CellUtil.createCell(family1, family1, family1)); - assertEquals(2, FSWALEntry.collectFamilies(cells).size()); - - cells.add(CellUtil.createCell(family0, family0, family0)); - cells.add(CellUtil.createCell(family1, family1, family1)); - assertEquals(2, FSWALEntry.collectFamilies(cells).size()); - - cells.add(CellUtil.createCell(family2, family2, family2)); - assertEquals(3, FSWALEntry.collectFamilies(cells).size()); - - cells.add(CellUtil.createCell(WALEdit.METAFAMILY, WALEdit.METAFAMILY, WALEdit.METAFAMILY)); - assertEquals(3, FSWALEntry.collectFamilies(cells).size()); - } -} - diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSequenceIdAccounting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSequenceIdAccounting.java index 9f5acbd..0ba92f8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSequenceIdAccounting.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSequenceIdAccounting.java @@ -17,16 +17,19 @@ */ package org.apache.hadoop.hbase.regionserver.wal; +import static org.apache.hadoop.hbase.HConstants.NO_SEQNUM; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; + import java.util.HashMap; -import java.util.HashSet; import java.util.Map; import java.util.Set; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Test; @@ -34,99 +37,56 @@ import org.junit.experimental.categories.Category; @Category(SmallTests.class) public class TestSequenceIdAccounting { - private static final byte [] ENCODED_REGION_NAME = Bytes.toBytes("r"); - private static final byte [] FAMILY_NAME = Bytes.toBytes("cf"); - private static final Set FAMILIES; - static { - FAMILIES = new HashSet<>(); - FAMILIES.add(FAMILY_NAME); - } + private static final byte[] ENCODED_REGION_NAME = Bytes.toBytes("r"); + private static final byte[] FAMILY1 = Bytes.toBytes("cf1"); + private static final byte[] FAMILY2 = Bytes.toBytes("cf2"); + private static final Set FAMILIES = ImmutableSet.of(FAMILY1, FAMILY2); @Test - public void testStartCacheFlush() { + public void testUpdateLowestUnflushedSequenceIds() { SequenceIdAccounting sida = new SequenceIdAccounting(); - sida.getOrCreateLowestSequenceIds(ENCODED_REGION_NAME); - Map m = new HashMap<>(); - m.put(ENCODED_REGION_NAME, HConstants.NO_SEQNUM); - assertEquals(HConstants.NO_SEQNUM, (long)sida.startCacheFlush(ENCODED_REGION_NAME, FAMILIES)); - sida.completeCacheFlush(ENCODED_REGION_NAME); - long sequenceid = 1; - sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid, true); - // Only one family so should return NO_SEQNUM still. - assertEquals(HConstants.NO_SEQNUM, (long)sida.startCacheFlush(ENCODED_REGION_NAME, FAMILIES)); - sida.completeCacheFlush(ENCODED_REGION_NAME); - long currentSequenceId = sequenceid; - sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid, true); - final Set otherFamily = new HashSet<>(1); - otherFamily.add(Bytes.toBytes("otherCf")); - sida.update(ENCODED_REGION_NAME, FAMILIES, ++sequenceid, true); - // Should return oldest sequence id in the region. - assertEquals(currentSequenceId, (long)sida.startCacheFlush(ENCODED_REGION_NAME, otherFamily)); - sida.completeCacheFlush(ENCODED_REGION_NAME); + assertEquals(NO_SEQNUM, sida.getLowestSequenceId(ENCODED_REGION_NAME)); + FAMILIES.forEach( + family -> assertEquals(NO_SEQNUM, sida.getLowestSequenceId(ENCODED_REGION_NAME, family))); + + sida.updateLowestUnflushedSequenceIds(ENCODED_REGION_NAME, ImmutableMap.of(FAMILY1, 1L)); + assertEquals(1L, sida.getLowestSequenceId(ENCODED_REGION_NAME)); + assertEquals(1L, sida.getLowestSequenceId(ENCODED_REGION_NAME, FAMILY1)); + assertEquals(NO_SEQNUM, sida.getLowestSequenceId(ENCODED_REGION_NAME, FAMILY2)); + + sida.updateLowestUnflushedSequenceIds(ENCODED_REGION_NAME, + ImmutableMap.of(FAMILY1, 3L, FAMILY2, 2L)); + assertEquals(2L, sida.getLowestSequenceId(ENCODED_REGION_NAME)); + assertEquals(3L, sida.getLowestSequenceId(ENCODED_REGION_NAME, FAMILY1)); + assertEquals(2L, sida.getLowestSequenceId(ENCODED_REGION_NAME, FAMILY2)); } @Test public void testAreAllLower() { SequenceIdAccounting sida = new SequenceIdAccounting(); - sida.getOrCreateLowestSequenceIds(ENCODED_REGION_NAME); - Map m = new HashMap<>(); - m.put(ENCODED_REGION_NAME, HConstants.NO_SEQNUM); - assertTrue(sida.areAllLower(m)); - long sequenceid = 1; - sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid, true); - sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true); - sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true); + Map m = new HashMap(); + m.put(ENCODED_REGION_NAME, NO_SEQNUM); assertTrue(sida.areAllLower(m)); - m.put(ENCODED_REGION_NAME, sequenceid); - assertFalse(sida.areAllLower(m)); - long lowest = sida.getLowestSequenceId(ENCODED_REGION_NAME); - assertEquals("Lowest should be first sequence id inserted", 1, lowest); - m.put(ENCODED_REGION_NAME, lowest); - assertFalse(sida.areAllLower(m)); - // Now make sure above works when flushing. - sida.startCacheFlush(ENCODED_REGION_NAME, FAMILIES); - assertFalse(sida.areAllLower(m)); - m.put(ENCODED_REGION_NAME, HConstants.NO_SEQNUM); + long sequenceId = 1L; + sida.updateLowestUnflushedSequenceIds(ENCODED_REGION_NAME, + ImmutableMap.of(FAMILY1, sequenceId, FAMILY2, sequenceId)); assertTrue(sida.areAllLower(m)); - // Let the flush complete and if we ask if the sequenceid is lower, should be yes since no edits - sida.completeCacheFlush(ENCODED_REGION_NAME); - m.put(ENCODED_REGION_NAME, sequenceid); - assertTrue(sida.areAllLower(m)); - // Flush again but add sequenceids while we are flushing. - sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true); - sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true); - sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true); - lowest = sida.getLowestSequenceId(ENCODED_REGION_NAME); - m.put(ENCODED_REGION_NAME, lowest); + m.put(ENCODED_REGION_NAME, sequenceId); assertFalse(sida.areAllLower(m)); - sida.startCacheFlush(ENCODED_REGION_NAME, FAMILIES); - // The cache flush will clear out all sequenceid accounting by region. - assertEquals(HConstants.NO_SEQNUM, sida.getLowestSequenceId(ENCODED_REGION_NAME)); - sida.completeCacheFlush(ENCODED_REGION_NAME); - // No new edits have gone in so no sequenceid to work with. - assertEquals(HConstants.NO_SEQNUM, sida.getLowestSequenceId(ENCODED_REGION_NAME)); - // Make an edit behind all we'll put now into sida. - m.put(ENCODED_REGION_NAME, sequenceid); - sida.update(ENCODED_REGION_NAME, FAMILIES, ++sequenceid, true); - sida.update(ENCODED_REGION_NAME, FAMILIES, ++sequenceid, true); - sida.update(ENCODED_REGION_NAME, FAMILIES, ++sequenceid, true); - assertTrue(sida.areAllLower(m)); } @Test public void testFindLower() { SequenceIdAccounting sida = new SequenceIdAccounting(); - sida.getOrCreateLowestSequenceIds(ENCODED_REGION_NAME); - Map m = new HashMap<>(); - m.put(ENCODED_REGION_NAME, HConstants.NO_SEQNUM); - long sequenceid = 1; - sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid, true); - sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true); - sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true); - assertTrue(sida.findLower(m) == null); - m.put(ENCODED_REGION_NAME, sida.getLowestSequenceId(ENCODED_REGION_NAME)); - assertTrue(sida.findLower(m).length == 1); - m.put(ENCODED_REGION_NAME, sida.getLowestSequenceId(ENCODED_REGION_NAME) - 1); - assertTrue(sida.findLower(m) == null); + Map m = new HashMap(); + m.put(ENCODED_REGION_NAME, NO_SEQNUM); + long sequenceId = 2; + sida.updateLowestUnflushedSequenceIds(ENCODED_REGION_NAME, + ImmutableMap.of(FAMILY1, sequenceId, FAMILY2, sequenceId)); + assertNull(sida.findLower(m)); + m.put(ENCODED_REGION_NAME, sequenceId); + assertEquals(1, sida.findLower(m).length); + m.put(ENCODED_REGION_NAME, sequenceId - 1); + assertNull(sida.findLower(m)); } } \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogProvider.java index f752735..309c8af 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogProvider.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogProvider.java @@ -25,10 +25,13 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.HashSet; +import java.util.Map; import java.util.NavigableMap; import java.util.Random; import java.util.Set; import java.util.TreeMap; +import java.util.TreeSet; +import java.util.stream.Collectors; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -51,6 +54,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.Pair; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -60,7 +64,7 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; -@Category({RegionServerTests.class, MediumTests.class}) +@Category({ RegionServerTests.class, MediumTests.class }) public class TestFSHLogProvider { private static final Log LOG = LogFactory.getLog(TestFSHLogProvider.class); @@ -95,12 +99,9 @@ public class TestFSHLogProvider { TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", 5000); // faster failover with cluster.shutdown();fs.close() idiom - TEST_UTIL.getConfiguration() - .setInt("hbase.ipc.client.connect.max.retries", 1); - TEST_UTIL.getConfiguration().setInt( - "dfs.client.block.recovery.retries", 1); - TEST_UTIL.getConfiguration().setInt( - "hbase.ipc.client.connection.maxidletime", 500); + TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connect.max.retries", 1); + TEST_UTIL.getConfiguration().setInt("dfs.client.block.recovery.retries", 1); + TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connection.maxidletime", 500); TEST_UTIL.startMiniDFSCluster(3); // Set up a working space for our tests. @@ -121,13 +122,13 @@ public class TestFSHLogProvider { @Test public void testGetServerNameFromWALDirectoryName() throws IOException { ServerName sn = ServerName.valueOf("hn", 450, 1398); - String hl = FSUtils.getRootDir(conf) + "/" + - AbstractFSWALProvider.getWALDirectoryName(sn.toString()); + String hl = + FSUtils.getRootDir(conf) + "/" + AbstractFSWALProvider.getWALDirectoryName(sn.toString()); // Must not throw exception assertNull(AbstractFSWALProvider.getServerNameFromWALDirectoryName(conf, null)); assertNull(AbstractFSWALProvider.getServerNameFromWALDirectoryName(conf, - FSUtils.getRootDir(conf).toUri().toString())); + FSUtils.getRootDir(conf).toUri().toString())); assertNull(AbstractFSWALProvider.getServerNameFromWALDirectoryName(conf, "")); assertNull(AbstractFSWALProvider.getServerNameFromWALDirectoryName(conf, " ")); assertNull(AbstractFSWALProvider.getServerNameFromWALDirectoryName(conf, hl)); @@ -137,30 +138,32 @@ public class TestFSHLogProvider { final String wals = "/WALs/"; ServerName parsed = AbstractFSWALProvider.getServerNameFromWALDirectoryName(conf, FSUtils.getRootDir(conf).toUri().toString() + wals + sn + - "/localhost%2C32984%2C1343316388997.1343316390417"); - assertEquals("standard", sn, parsed); + "/localhost%2C32984%2C1343316388997.1343316390417"); + assertEquals("standard", sn, parsed); parsed = AbstractFSWALProvider.getServerNameFromWALDirectoryName(conf, hl + "/qdf"); assertEquals("subdir", sn, parsed); parsed = AbstractFSWALProvider.getServerNameFromWALDirectoryName(conf, FSUtils.getRootDir(conf).toUri().toString() + wals + sn + - "-splitting/localhost%3A57020.1340474893931"); + "-splitting/localhost%3A57020.1340474893931"); assertEquals("split", sn, parsed); } - - protected void addEdits(WAL log, HRegionInfo hri, HTableDescriptor htd, - int times, NavigableMap scopes) throws IOException { + private long addEdits(WAL log, HRegionInfo hri, HTableDescriptor htd, int times, + NavigableMap scopes) throws IOException { + long sequenceId = HConstants.NO_SEQNUM; final byte[] row = Bytes.toBytes("row"); for (int i = 0; i < times; i++) { long timestamp = System.currentTimeMillis(); WALEdit cols = new WALEdit(); cols.add(new KeyValue(row, row, row, timestamp, row)); - log.append(hri, getWalKey(hri.getEncodedNameAsBytes(), htd.getTableName(), timestamp, scopes), - cols, true); + WALKey key = getWalKey(hri.getEncodedNameAsBytes(), htd.getTableName(), timestamp, scopes); + log.append(hri, key, cols, true); + sequenceId = key.getSequenceId(); } log.sync(); + return sequenceId; } /** @@ -175,78 +178,82 @@ public class TestFSHLogProvider { /** * helper method to simulate region flush for a WAL. * @param wal - * @param regionEncodedName + * @param encodedRegionName */ - protected void flushRegion(WAL wal, byte[] regionEncodedName, Set flushedFamilyNames) { - wal.startCacheFlush(regionEncodedName, flushedFamilyNames); - wal.completeCacheFlush(regionEncodedName); + private void flushRegion(WAL wal, byte[] encodedRegionName, Set flushedFamilyNames, + long sequenceId) { + wal.startCacheFlush(encodedRegionName); + Map family2LowestUnflushedSequenceId = + flushedFamilyNames.stream().map(f -> Pair.newPair(f, sequenceId)) + .collect(Collectors.toMap(Pair::getFirst, Pair::getSecond, (u, v) -> { + throw new IllegalStateException("family should be unique"); + }, () -> new TreeMap<>(Bytes.BYTES_COMPARATOR))); + wal.completeCacheFlush(encodedRegionName, family2LowestUnflushedSequenceId); } - private static final byte[] UNSPECIFIED_REGION = new byte[]{}; + private static final byte[] UNSPECIFIED_REGION = new byte[] {}; @Test public void testLogCleaning() throws Exception { LOG.info(currentTest.getMethodName()); final HTableDescriptor htd = - new HTableDescriptor(TableName.valueOf(currentTest.getMethodName())).addFamily(new HColumnDescriptor( - "row")); + new HTableDescriptor(TableName.valueOf(currentTest.getMethodName())) + .addFamily(new HColumnDescriptor("row")); final HTableDescriptor htd2 = new HTableDescriptor(TableName.valueOf(currentTest.getMethodName() + "2")) .addFamily(new HColumnDescriptor("row")); - NavigableMap scopes1 = new TreeMap<>( - Bytes.BYTES_COMPARATOR); - for(byte[] fam : htd.getFamiliesKeys()) { - scopes1.put(fam, 0); + NavigableMap scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); + for (byte[] fam : htd.getFamiliesKeys()) { + scopes.put(fam, 0); } - NavigableMap scopes2 = new TreeMap<>( - Bytes.BYTES_COMPARATOR); - for(byte[] fam : htd2.getFamiliesKeys()) { + NavigableMap scopes2 = new TreeMap<>(Bytes.BYTES_COMPARATOR); + for (byte[] fam : htd2.getFamiliesKeys()) { scopes2.put(fam, 0); } final Configuration localConf = new Configuration(conf); localConf.set(WALFactory.WAL_PROVIDER, FSHLogProvider.class.getName()); final WALFactory wals = new WALFactory(localConf, null, currentTest.getMethodName()); try { - HRegionInfo hri = new HRegionInfo(htd.getTableName(), - HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); - HRegionInfo hri2 = new HRegionInfo(htd2.getTableName(), - HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); + HRegionInfo hri = + new HRegionInfo(htd.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); + HRegionInfo hri2 = new HRegionInfo(htd2.getTableName(), HConstants.EMPTY_START_ROW, + HConstants.EMPTY_END_ROW); + // we want to mix edits from regions, so pick our own identifier. final WAL log = wals.getWAL(UNSPECIFIED_REGION, null); - + htd.getFamiliesKeys().forEach(f -> log.updateStore(hri.getEncodedNameAsBytes(), f, 0)); + htd2.getFamiliesKeys().forEach(f -> log.updateStore(hri2.getEncodedNameAsBytes(), f, 0)); // Add a single edit and make sure that rolling won't remove the file // Before HBASE-3198 it used to delete it - addEdits(log, hri, htd, 1, scopes1); + long seqId = addEdits(log, hri, htd, 1, scopes); log.rollWriter(); assertEquals(1, AbstractFSWALProvider.getNumRolledLogFiles(log)); // See if there's anything wrong with more than 1 edit - addEdits(log, hri, htd, 2, scopes1); + seqId = addEdits(log, hri, htd, 2, scopes); log.rollWriter(); assertEquals(2, FSHLogProvider.getNumRolledLogFiles(log)); // Now mix edits from 2 regions, still no flushing - addEdits(log, hri, htd, 1, scopes1); - addEdits(log, hri2, htd2, 1, scopes2); - addEdits(log, hri, htd, 1, scopes1); - addEdits(log, hri2, htd2, 1, scopes2); + seqId = addEdits(log, hri, htd, 1, scopes); + long seqId2 = addEdits(log, hri2, htd2, 1, scopes2); + seqId = addEdits(log, hri, htd, 1, scopes); + seqId2 = addEdits(log, hri2, htd2, 1, scopes2); log.rollWriter(); assertEquals(3, AbstractFSWALProvider.getNumRolledLogFiles(log)); // Flush the first region, we expect to see the first two files getting // archived. We need to append something or writer won't be rolled. - addEdits(log, hri2, htd2, 1, scopes2); - log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getFamiliesKeys()); - log.completeCacheFlush(hri.getEncodedNameAsBytes()); + seqId2 = addEdits(log, hri2, htd2, 1, scopes2); + flushRegion(log, hri.getEncodedNameAsBytes(), htd.getFamiliesKeys(), seqId + 1); log.rollWriter(); assertEquals(2, AbstractFSWALProvider.getNumRolledLogFiles(log)); // Flush the second region, which removes all the remaining output files // since the oldest was completely flushed and the two others only contain // flush information - addEdits(log, hri2, htd2, 1, scopes2); - log.startCacheFlush(hri2.getEncodedNameAsBytes(), htd2.getFamiliesKeys()); - log.completeCacheFlush(hri2.getEncodedNameAsBytes()); + seqId2 = addEdits(log, hri2, htd2, 1, scopes2); + flushRegion(log, hri2.getEncodedNameAsBytes(), htd2.getFamiliesKeys(), seqId2 + 1); log.rollWriter(); assertEquals(0, AbstractFSWALProvider.getNumRolledLogFiles(log)); } finally { @@ -257,14 +264,13 @@ public class TestFSHLogProvider { } /** - * Tests wal archiving by adding data, doing flushing/rolling and checking we archive old logs - * and also don't archive "live logs" (that is, a log with un-flushed entries). + * Tests wal archiving by adding data, doing flushing/rolling and checking we archive old logs and + * also don't archive "live logs" (that is, a log with un-flushed entries). *

- * This is what it does: - * It creates two regions, and does a series of inserts along with log rolling. - * Whenever a WAL is rolled, HLogBase checks previous wals for archiving. A wal is eligible for - * archiving if for all the regions which have entries in that wal file, have flushed - past - * their maximum sequence id in that wal file. + * This is what it does: It creates two regions, and does a series of inserts along with log + * rolling. Whenever a WAL is rolled, HLogBase checks previous wals for archiving. A wal is + * eligible for archiving if for all the regions which have entries in that wal file, have flushed + * - past their maximum sequence id in that wal file. *

* @throws IOException */ @@ -272,17 +278,17 @@ public class TestFSHLogProvider { public void testWALArchiving() throws IOException { LOG.debug(currentTest.getMethodName()); HTableDescriptor table1 = - new HTableDescriptor(TableName.valueOf(currentTest.getMethodName() + "1")).addFamily(new HColumnDescriptor("row")); + new HTableDescriptor(TableName.valueOf(currentTest.getMethodName() + "1")) + .addFamily(new HColumnDescriptor("row")); HTableDescriptor table2 = - new HTableDescriptor(TableName.valueOf(currentTest.getMethodName() + "2")).addFamily(new HColumnDescriptor("row")); - NavigableMap scopes1 = new TreeMap<>( - Bytes.BYTES_COMPARATOR); - for(byte[] fam : table1.getFamiliesKeys()) { + new HTableDescriptor(TableName.valueOf(currentTest.getMethodName() + "2")) + .addFamily(new HColumnDescriptor("row")); + NavigableMap scopes1 = new TreeMap<>(Bytes.BYTES_COMPARATOR); + for (byte[] fam : table1.getFamiliesKeys()) { scopes1.put(fam, 0); } - NavigableMap scopes2 = new TreeMap<>( - Bytes.BYTES_COMPARATOR); - for(byte[] fam : table2.getFamiliesKeys()) { + NavigableMap scopes2 = new TreeMap<>(Bytes.BYTES_COMPARATOR); + for (byte[] fam : table2.getFamiliesKeys()) { scopes2.put(fam, 0); } final Configuration localConf = new Configuration(conf); @@ -291,18 +297,19 @@ public class TestFSHLogProvider { try { final WAL wal = wals.getWAL(UNSPECIFIED_REGION, null); assertEquals(0, AbstractFSWALProvider.getNumRolledLogFiles(wal)); - HRegionInfo hri1 = - new HRegionInfo(table1.getTableName(), HConstants.EMPTY_START_ROW, - HConstants.EMPTY_END_ROW); - HRegionInfo hri2 = - new HRegionInfo(table2.getTableName(), HConstants.EMPTY_START_ROW, - HConstants.EMPTY_END_ROW); + HRegionInfo hri1 = new HRegionInfo(table1.getTableName(), HConstants.EMPTY_START_ROW, + HConstants.EMPTY_END_ROW); + HRegionInfo hri2 = new HRegionInfo(table2.getTableName(), HConstants.EMPTY_START_ROW, + HConstants.EMPTY_END_ROW); // ensure that we don't split the regions. hri1.setSplit(false); hri2.setSplit(false); + // init sequence id accounting + table1.getFamiliesKeys().forEach(f -> wal.updateStore(hri1.getEncodedNameAsBytes(), f, 0)); + table2.getFamiliesKeys().forEach(f -> wal.updateStore(hri2.getEncodedNameAsBytes(), f, 0)); // variables to mock region sequenceIds. // start with the testing logic: insert a waledit, and roll writer - addEdits(wal, hri1, table1, 1, scopes1); + long seqId1 = addEdits(wal, hri1, table1, 1, scopes1); wal.rollWriter(); // assert that the wal is rolled assertEquals(1, AbstractFSWALProvider.getNumRolledLogFiles(wal)); @@ -312,22 +319,22 @@ public class TestFSHLogProvider { // assert that the wal is rolled assertEquals(2, AbstractFSWALProvider.getNumRolledLogFiles(wal)); // add a waledit to table1, and flush the region. - addEdits(wal, hri1, table1, 3, scopes1); - flushRegion(wal, hri1.getEncodedNameAsBytes(), table1.getFamiliesKeys()); + seqId1 = addEdits(wal, hri1, table1, 3, scopes1); + flushRegion(wal, hri1.getEncodedNameAsBytes(), table1.getFamiliesKeys(), seqId1 + 1); // roll log; all old logs should be archived. wal.rollWriter(); assertEquals(0, AbstractFSWALProvider.getNumRolledLogFiles(wal)); // add an edit to table2, and roll writer - addEdits(wal, hri2, table2, 1, scopes2); + long seqId2 = addEdits(wal, hri2, table2, 1, scopes2); wal.rollWriter(); assertEquals(1, AbstractFSWALProvider.getNumRolledLogFiles(wal)); // add edits for table1, and roll writer - addEdits(wal, hri1, table1, 2, scopes1); + seqId1 = addEdits(wal, hri1, table1, 2, scopes1); wal.rollWriter(); assertEquals(2, AbstractFSWALProvider.getNumRolledLogFiles(wal)); // add edits for table2, and flush hri1. - addEdits(wal, hri2, table2, 2, scopes2); - flushRegion(wal, hri1.getEncodedNameAsBytes(), table2.getFamiliesKeys()); + seqId2 = addEdits(wal, hri2, table2, 2, scopes2); + flushRegion(wal, hri1.getEncodedNameAsBytes(), table1.getFamiliesKeys(), seqId1 + 1); // the log : region-sequenceId map is // log1: region2 (unflushed) // log2: region1 (flushed) @@ -336,8 +343,8 @@ public class TestFSHLogProvider { wal.rollWriter(); assertEquals(2, AbstractFSWALProvider.getNumRolledLogFiles(wal)); // flush region2, and all logs should be archived. - addEdits(wal, hri2, table2, 2, scopes2); - flushRegion(wal, hri2.getEncodedNameAsBytes(), table2.getFamiliesKeys()); + seqId2 = addEdits(wal, hri2, table2, 2, scopes2); + flushRegion(wal, hri2.getEncodedNameAsBytes(), table2.getFamiliesKeys(), seqId2 + 1); wal.rollWriter(); assertEquals(0, AbstractFSWALProvider.getNumRolledLogFiles(wal)); } finally { @@ -355,9 +362,9 @@ public class TestFSHLogProvider { public void testConcurrentWrites() throws Exception { // Run the WPE tool with three threads writing 3000 edits each concurrently. // When done, verify that all edits were written. - int errCode = WALPerformanceEvaluation. - innerMain(new Configuration(TEST_UTIL.getConfiguration()), - new String [] {"-threads", "3", "-verify", "-noclosefs", "-iterations", "3000"}); + int errCode = + WALPerformanceEvaluation.innerMain(new Configuration(TEST_UTIL.getConfiguration()), + new String[] { "-threads", "3", "-verify", "-noclosefs", "-iterations", "3000" }); assertEquals(0, errCode); } @@ -373,10 +380,11 @@ public class TestFSHLogProvider { final Set seen = new HashSet<>(1); final Random random = new Random(); assertTrue("first attempt to add WAL from default provider should work.", - seen.add(wals.getWAL(Bytes.toBytes(random.nextInt()), null))); + seen.add(wals.getWAL(Bytes.toBytes(random.nextInt()), null))); for (int i = 0; i < 1000; i++) { - assertFalse("default wal provider is only supposed to return a single wal, which should " - + "compare as .equals itself.", + assertFalse( + "default wal provider is only supposed to return a single wal, which should " + + "compare as .equals itself.", seen.add(wals.getWAL(Bytes.toBytes(random.nextInt()), null))); } } finally { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java index f02e244..5eb2e5e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java @@ -190,18 +190,20 @@ public class TestWALFactory { for (int i = 0; i < howmany; i++) { final WAL log = wals.getWAL(infos[i].getEncodedNameAsBytes(), infos[i].getTable().getNamespace()); + log.updateStore(infos[i].getEncodedNameAsBytes(), Bytes.toBytes("column"), + HConstants.NO_SEQNUM); for (int j = 0; j < howmany; j++) { WALEdit edit = new WALEdit(); - byte [] family = Bytes.toBytes("column"); - byte [] qualifier = Bytes.toBytes(Integer.toString(j)); - byte [] column = Bytes.toBytes("column:" + Integer.toString(j)); + byte[] family = Bytes.toBytes("column"); + byte[] qualifier = Bytes.toBytes(Integer.toString(j)); + byte[] column = Bytes.toBytes("column:" + Integer.toString(j)); edit.add(new KeyValue(rowName, family, qualifier, System.currentTimeMillis(), column)); LOG.info("Region " + i + ": " + edit); WALKey walKey = new WALKey(infos[i].getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), mvcc, scopes); log.append(infos[i], walKey, edit, true); - walKey.getWriteEntry(); + mvcc.complete(walKey.getWriteEntry()); } log.sync(); log.rollWriter(true); @@ -531,8 +533,6 @@ public class TestWALFactory { mvcc, scopes), cols, true); log.sync(txid); - log.startCacheFlush(info.getEncodedNameAsBytes(), htd.getFamiliesKeys()); - log.completeCacheFlush(info.getEncodedNameAsBytes()); log.shutdown(); Path filename = AbstractFSWALProvider.getCurrentFileName(log); // Now open a reader on the log and assert append worked. @@ -594,8 +594,6 @@ public class TestWALFactory { mvcc, scopes), cols, true); log.sync(txid); - log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getFamiliesKeys()); - log.completeCacheFlush(hri.getEncodedNameAsBytes()); log.shutdown(); Path filename = AbstractFSWALProvider.getCurrentFileName(log); // Now open a reader on the log and assert append worked. -- 2.7.4