From 97ab1eb7df8aef7eeece5e23f11254368c443cf5 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Mon, 8 Jun 2015 22:54:40 +0800 Subject: [PATCH] HBASE-13811 Splitting WALs, we are filtering out too many edits -> DATALOSS --- .../apache/hadoop/hbase/regionserver/HRegion.java | 28 ++-- .../hadoop/hbase/regionserver/HRegionServer.java | 19 +-- .../hadoop/hbase/regionserver/wal/FSHLog.java | 42 ++++-- .../hadoop/hbase/wal/DisabledWALProvider.java | 11 +- .../main/java/org/apache/hadoop/hbase/wal/WAL.java | 46 ++++--- .../hbase/regionserver/TestSplitWalDataLoss.java | 149 +++++++++++++++++++++ 6 files changed, 233 insertions(+), 62 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 0d5306e..0e83dd3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -217,13 +217,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi final AtomicBoolean closing = new AtomicBoolean(false); /** - * The max sequence id of flushed data on this region. Used doing some rough calculations on - * whether time to flush or not. + * The max sequence id of flushed data on this region. There is no edit in memory that is + * less that this sequence id. */ private volatile long maxFlushedSeqId = HConstants.NO_SEQNUM; /** - * Record the sequence id of last flush operation. + * Record the sequence id of last flush operation. Can be in advance of + * {@link #maxFlushedSeqId} when flushing a single column family. In this case, + * {@link #maxFlushedSeqId} will be older than the oldest edit in memory. */ private volatile long lastFlushOpSeqId = HConstants.NO_SEQNUM; /** @@ -1619,7 +1621,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi .setSequenceId( oldestUnflushedSeqId < 0 ? lastFlushOpSeqIdLocal : oldestUnflushedSeqId - 1).build()); } - return regionLoadBldr.setCompleteSequenceId(this.maxFlushedSeqId); + return regionLoadBldr.setCompleteSequenceId(getMaxFlushedSeqId()); } ////////////////////////////////////////////////////////////////////////////// @@ -2132,21 +2134,21 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi try { w = mvcc.beginMemstoreInsert(); if (wal != null) { - if (!wal.startCacheFlush(encodedRegionName, flushedFamilyNames)) { - // This should never happen. - String msg = "Flush will not be started for [" - + this.getRegionInfo().getEncodedName() + "] - because the WAL is closing."; + Long earliestUnflushedSequenceIdForTheRegion = + wal.startCacheFlush(encodedRegionName, flushedFamilyNames); + if (earliestUnflushedSequenceIdForTheRegion == null) { + // This should never happen. This is how startCacheFlush signals flush cannot proceed. + String msg = this.getRegionInfo().getEncodedName() + " flush aborted; WAL closing."; status.setStatus(msg); return new PrepareFlushResult( new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false), myseqid); } flushOpSeqId = getNextSequenceId(wal); - long oldestUnflushedSeqId = wal.getEarliestMemstoreSeqNum(encodedRegionName); - // no oldestUnflushedSeqId means we flushed all stores. - // or the unflushed stores are all empty. - flushedSeqId = (oldestUnflushedSeqId == HConstants.NO_SEQNUM) ? flushOpSeqId - : oldestUnflushedSeqId - 1; + // Back up 1, minus 1 from oldest sequence id in memstore to get last 'flushed' edit + flushedSeqId = + earliestUnflushedSequenceIdForTheRegion.longValue() == HConstants.NO_SEQNUM? + flushOpSeqId: earliestUnflushedSequenceIdForTheRegion.longValue() - 1; } else { // use the provided sequence Id as WAL is not being used for this flush. flushedSeqId = flushOpSeqId = myseqid; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 8f4059e..3664ffb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -2835,22 +2835,15 @@ public class HRegionServer extends HasThread implements Region toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName()); if (destination != null) { - try { - WAL wal = getWAL(r.getRegionInfo()); - long closeSeqNum = wal.getEarliestMemstoreSeqNum(r.getRegionInfo().getEncodedNameAsBytes()); + long closeSeqNum = r.getMaxFlushedSeqId(); + if (closeSeqNum == HConstants.NO_SEQNUM) { + // No edits in WAL for this region; get the sequence number when the region was opened. + closeSeqNum = r.getOpenSeqNum(); if (closeSeqNum == HConstants.NO_SEQNUM) { - // No edits in WAL for this region; get the sequence number when the region was opened. - closeSeqNum = r.getOpenSeqNum(); - if (closeSeqNum == HConstants.NO_SEQNUM) { - closeSeqNum = 0; - } + closeSeqNum = 0; } - addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum); - } catch (IOException exception) { - LOG.error("Could not retrieve WAL information for region " + r.getRegionInfo() + - "; not adding to moved regions."); - LOG.debug("Exception details for failure to get wal", exception); } + addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum); } this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName()); return toReturn != null; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 7c170b0..76e93a0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -28,6 +28,7 @@ import java.lang.reflect.Method; import java.net.URLEncoder; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.List; @@ -850,7 +851,12 @@ public class FSHLog implements WAL { // Keeping the old behavior of iterating unflushedSeqNums under oldestSeqNumsLock. synchronized (regionSequenceIdLock) { for (Map.Entry e: regionsSequenceNums.entrySet()) { - long unFlushedVal = getEarliestMemstoreSeqNum(e.getKey()); + ConcurrentMap m = + this.oldestUnflushedStoreSequenceIds.get(e.getKey()); + if (m == null) { + continue; + } + long unFlushedVal = Collections.min(m.values()); if (unFlushedVal != HConstants.NO_SEQNUM && unFlushedVal <= e.getValue()) { if (regionsToFlush == null) regionsToFlush = new ArrayList(); @@ -1631,14 +1637,15 @@ public class FSHLog implements WAL { } @Override - public boolean startCacheFlush(final byte[] encodedRegionName, + public Long startCacheFlush(final byte[] encodedRegionName, Set flushedFamilyNames) { Map oldStoreSeqNum = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); if (!closeBarrier.beginOp()) { LOG.info("Flush will not be started for " + Bytes.toString(encodedRegionName) + " - because the server is closing."); - return false; + return null; } + long oldestUnflushedSequenceId = HConstants.NO_SEQNUM; synchronized (regionSequenceIdLock) { ConcurrentMap oldestUnflushedStoreSequenceIdsOfRegion = oldestUnflushedStoreSequenceIds.get(encodedRegionName); @@ -1661,6 +1668,9 @@ public class FSHLog implements WAL { // Do not worry about data racing, we held write lock of region when calling // startCacheFlush, so no one can add value to the map we removed. oldestUnflushedStoreSequenceIds.remove(encodedRegionName); + } else { + oldestUnflushedSequenceId = + Collections.min(oldestUnflushedStoreSequenceIdsOfRegion.values()); } } } @@ -1673,7 +1683,7 @@ public class FSHLog implements WAL { LOG.warn("Couldn't find oldest seqNum for the region we are about to flush: [" + Bytes.toString(encodedRegionName) + "]"); } - return true; + return oldestUnflushedSequenceId; } @Override @@ -1759,7 +1769,6 @@ public class FSHLog implements WAL { WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf)); } - @Override public long getEarliestMemstoreSeqNum(byte[] encodedRegionName) { ConcurrentMap oldestUnflushedStoreSequenceIdsOfRegion = @@ -1771,14 +1780,23 @@ public class FSHLog implements WAL { @Override public long getEarliestMemstoreSeqNum(byte[] encodedRegionName, byte[] familyName) { - ConcurrentMap oldestUnflushedStoreSequenceIdsOfRegion = - this.oldestUnflushedStoreSequenceIds.get(encodedRegionName); - if (oldestUnflushedStoreSequenceIdsOfRegion != null) { - Long result = oldestUnflushedStoreSequenceIdsOfRegion.get(familyName); - return result != null ? result.longValue() : HConstants.NO_SEQNUM; - } else { - return HConstants.NO_SEQNUM; + synchronized (regionSequenceIdLock) { + Map m = this.lowestFlushingStoreSequenceIds.get(encodedRegionName); + if (m != null) { + Long earlist = m.get(familyName); + if (earlist != null) { + return earlist; + } + } + m = this.oldestUnflushedStoreSequenceIds.get(encodedRegionName); + if (m != null) { + Long earlist = m.get(familyName); + if (earlist != null) { + return earlist; + } + } } + return HConstants.NO_SEQNUM; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java index 7254ad1..56d17a2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java @@ -26,21 +26,19 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.util.FSUtils; - +import org.apache.hadoop.hbase.classification.InterfaceAudience; // imports for things that haven't moved from regionserver.wal yet. import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.util.FSUtils; /** * No-op implementation of {@link WALProvider} used when the WAL is disabled. @@ -187,8 +185,9 @@ class DisabledWALProvider implements WALProvider { } @Override - public boolean startCacheFlush(final byte[] encodedRegionName, Set flushedFamilyNames) { - return !(closed.get()); + public Long startCacheFlush(final byte[] encodedRegionName, Set flushedFamilyNames) { + if (closed.get()) return null; + return HConstants.NO_SEQNUM; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java index 5a2b08d..473bba9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java @@ -25,12 +25,12 @@ import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; -import org.apache.hadoop.hbase.classification.InterfaceStability; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; - +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; // imports we use from yet-to-be-moved regionsever.wal import org.apache.hadoop.hbase.regionserver.wal.CompressionContext; import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; @@ -39,6 +39,8 @@ import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import com.google.common.annotations.VisibleForTesting; + /** * A Write Ahead Log (WAL) provides service for reading, writing waledits. This interface provides * APIs for WAL users (such as RegionServer) to use the WAL (do append, sync, etc). @@ -140,20 +142,23 @@ public interface WAL { void sync(long txid) throws IOException; /** - * WAL keeps track of the sequence numbers that were not yet flushed from memstores - * in order to be able to do cleanup. This method tells WAL that some region is about - * to flush memstore. - * - *

We stash the oldest seqNum for the region, and let the the next edit inserted in this - * region be recorded in {@link #append(HTableDescriptor, HRegionInfo, WALKey, WALEdit, - * AtomicLong, boolean, List)} as new oldest seqnum. - * In case of flush being aborted, we put the stashed value back; in case of flush succeeding, - * the seqNum of that first edit after start becomes the valid oldest seqNum for this region. + * WAL keeps track of the sequence numbers that are as yet not flushed im memstores + * in order to be able to do accounting to figure which WALs can be let go. This method tells WAL + * that some region is about to flush. The flush can be the whole region or for a column family + * of the region only. * - * @return true if the flush can proceed, false in case wal is closing (ususally, when server is - * closing) and flush couldn't be started. + *

Currently, it is expected that the update lock is held for the region; i.e. no + * concurrent appends while we set up cache flush. + * @param families Families to flush. May be a subset of all families in the region. + * @return Returns {@link HConstants#NO_SEQNUM} if we are flushing the whole region OR if + * we are flushing a subset of all families but there are no edits in those families not + * being flushed; in other words, this is effectively same as a flush of all of the region + * though we were passed a subset of regions. Otherwise, it returns the sequence id of the + * oldest/lowest outstanding edit. + * @see #completeCacheFlush(byte[]) + * @see #abortCacheFlush(byte[]) */ - boolean startCacheFlush(final byte[] encodedRegionName, Set flushedFamilyNames); + Long startCacheFlush(final byte[] encodedRegionName, Set flushedFamilyNames); /** * Complete the cache flush. @@ -175,11 +180,16 @@ public interface WAL { WALCoprocessorHost getCoprocessorHost(); - /** Gets the earliest sequence number in the memstore for this particular region. - * This can serve as best-effort "recent" WAL number for this region. + /** + * Gets the earliest unflushed sequence id in the memstore for the region. + * @param encodedRegionName The region to get the number for. - * @return The number if present, HConstants.NO_SEQNUM if absent. + * @return The earliest/lowest/oldest sequence id if present, HConstants.NO_SEQNUM if absent. + * @deprecated Since version 1.1.1. Removing because not used and exposes subtle internal + * workings. Use {@link #getEarliestMemstoreSeqNum(byte[], byte[])} */ + @VisibleForTesting + @Deprecated long getEarliestMemstoreSeqNum(byte[] encodedRegionName); /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java new file mode 100644 index 0000000..be201ea --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; + +import java.io.IOException; +import java.util.Collection; + +import org.apache.commons.lang.mutable.MutableBoolean; +import org.apache.hadoop.hbase.DroppedSnapshotException; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult; +import org.apache.hadoop.hbase.regionserver.Region.FlushResult; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.wal.WAL; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Matchers; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.mortbay.log.Log; + +/** + * Testcase for https://issues.apache.org/jira/browse/HBASE-13811 + */ +@Category({ MediumTests.class }) +public class TestSplitWalDataLoss { + + private final HBaseTestingUtility testUtil = new HBaseTestingUtility(); + + private NamespaceDescriptor namespace = NamespaceDescriptor.create(getClass().getSimpleName()) + .build(); + + private TableName tableName = TableName.valueOf(namespace.getName(), "dataloss"); + + private byte[] family = Bytes.toBytes("f"); + + private byte[] qualifier = Bytes.toBytes("q"); + + @Before + public void setUp() throws Exception { + testUtil.getConfiguration().setInt("hbase.regionserver.msginterval", 30000); + testUtil.getConfiguration().setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false); + testUtil.startMiniCluster(2); + HBaseAdmin admin = testUtil.getHBaseAdmin(); + admin.createNamespace(namespace); + admin.createTable(new HTableDescriptor(tableName).addFamily(new HColumnDescriptor(family))); + testUtil.waitTableAvailable(tableName); + } + + @After + public void tearDown() throws Exception { + testUtil.shutdownMiniCluster(); + } + + @Test + public void test() throws IOException, InterruptedException { + final HRegionServer rs = testUtil.getRSForFirstRegionInTable(tableName); + final HRegion region = (HRegion) rs.getOnlineRegions(tableName).get(0); + HRegion spiedRegion = spy(region); + final MutableBoolean flushed = new MutableBoolean(false); + final MutableBoolean reported = new MutableBoolean(false); + doAnswer(new Answer() { + @Override + public FlushResult answer(InvocationOnMock invocation) throws Throwable { + synchronized (flushed) { + flushed.setValue(true); + flushed.notifyAll(); + } + synchronized (reported) { + while (!reported.booleanValue()) { + reported.wait(); + } + } + rs.getWAL(region.getRegionInfo()).abortCacheFlush( + region.getRegionInfo().getEncodedNameAsBytes()); + throw new DroppedSnapshotException("testcase"); + } + }).when(spiedRegion).internalFlushCacheAndCommit(Matchers. any(), + Matchers. any(), Matchers. any(), + Matchers.> any()); + rs.onlineRegions.put(rs.onlineRegions.keySet().iterator().next(), spiedRegion); + Connection conn = testUtil.getConnection(); + + try (Table table = conn.getTable(tableName)) { + table.put(new Put(Bytes.toBytes("row0")).addColumn(family, qualifier, Bytes.toBytes("val0"))); + } + long oldestSeqIdOfStore = region.getOldestSeqIdOfStore(family); + Log.info("CHANGE OLDEST " + oldestSeqIdOfStore); + assertTrue(oldestSeqIdOfStore > HConstants.NO_SEQNUM); + rs.cacheFlusher.requestFlush(spiedRegion, false); + synchronized (flushed) { + while (!flushed.booleanValue()) { + flushed.wait(); + } + } + try (Table table = conn.getTable(tableName)) { + table.put(new Put(Bytes.toBytes("row1")).addColumn(family, qualifier, Bytes.toBytes("val1"))); + } + long now = EnvironmentEdgeManager.currentTime(); + rs.tryRegionServerReport(now - 500, now); + synchronized (reported) { + reported.setValue(true); + reported.notifyAll(); + } + while (testUtil.getRSForFirstRegionInTable(tableName) == rs) { + Thread.sleep(100); + } + try (Table table = conn.getTable(tableName)) { + Result result = table.get(new Get(Bytes.toBytes("row0"))); + assertArrayEquals(Bytes.toBytes("val0"), result.getValue(family, qualifier)); + } + } +} \ No newline at end of file -- 1.9.1