From 0bfa7a5b1cd72d323b935948b286d0bb8ced60fe Mon Sep 17 00:00:00 2001 From: zhangduo Date: Tue, 29 May 2018 11:33:30 +0800 Subject: [PATCH] HBASE-20637 Polish the WAL switching when transiting from A to S --- .../org/apache/hadoop/hbase/fs/HFileSystem.java | 2 +- .../hadoop/hbase/regionserver/HRegionServer.java | 2 +- .../hadoop/hbase/regionserver/wal/AsyncFSWAL.java | 52 +++++++++++++++- .../hbase/regionserver/wal/DualAsyncFSWAL.java | 71 ++++++++++++++++------ .../ReplaySyncReplicationWALCallable.java | 2 +- .../hbase/wal/SyncReplicationWALProvider.java | 2 +- .../hbase/replication/DualAsyncFSWALForTest.java | 4 +- .../replication/TestSyncReplicationActive.java | 42 +++++++++++-- 8 files changed, 147 insertions(+), 30 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java index bc3d85e..201aa9e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java @@ -142,7 +142,7 @@ public class HFileSystem extends FilterFileSystem { * Returns the underlying filesystem * @return The underlying FileSystem for this FilterFileSystem object. */ - public FileSystem getBackingFs() throws IOException { + public FileSystem getBackingFs() { return fs; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 90f3099..bed36ea 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -2876,7 +2876,7 @@ public class HRegionServer extends HasThread implements /** * @return Return the walFs. */ - public FileSystem getWALFileSystem() { + public HFileSystem getWALFileSystem() { return walFs; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index 9b4ce9c..7f3e30b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -52,12 +52,12 @@ import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.htrace.core.TraceScope; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hbase.thirdparty.io.netty.channel.Channel; import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; @@ -470,6 +470,44 @@ public class AsyncFSWAL extends AbstractFSWAL { // whether to issue a sync in the caller method. } + private void drainNonMarkerEditsAndFailSyncs() { + if (toWriteAppends.isEmpty()) { + return; + } + boolean hasNonMarkerEdits = false; + Iterator iter = toWriteAppends.descendingIterator(); + while (iter.hasNext()) { + FSWALEntry entry = iter.next(); + if (!entry.getEdit().isMetaEdit()) { + hasNonMarkerEdits = true; + break; + } + } + if (hasNonMarkerEdits) { + for (;;) { + iter.remove(); + if (!iter.hasNext()) { + break; + } + iter.next(); + } + unackedAppends.clear(); + // fail the sync futures which are under the txid of the first remaining edit, if none, fail + // all the sync futures. + long txid = toWriteAppends.isEmpty() ? Long.MAX_VALUE : toWriteAppends.peek().getTxid(); + IOException error = new IOException("WAL is closing, only marker edit is allowed"); + for (Iterator syncIter = syncFutures.iterator(); syncIter.hasNext();) { + SyncFuture future = syncIter.next(); + if (future.getTxid() < txid) { + future.done(future.getTxid(), error); + syncIter.remove(); + } else { + break; + } + } + } + } + private void consume() { consumeLock.lock(); try { @@ -512,6 +550,9 @@ public class AsyncFSWAL extends AbstractFSWAL { } waitingConsumePayloadsGatingSequence.set(nextCursor); } + if (markerEditOnly()) { + drainNonMarkerEditsAndFailSyncs(); + } appendAndSync(); if (hasConsumerTask.get()) { return; @@ -553,9 +594,18 @@ public class AsyncFSWAL extends AbstractFSWAL { return consumerScheduled.compareAndSet(false, true); } + // This is used by sync replication, where we are going to close the wal soon after we reopen all + // the regions. Will be overridden by sub classes. + protected boolean markerEditOnly() { + return false; + } + @Override public long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore) throws IOException { + if (markerEditOnly() && !edits.isMetaEdit()) { + throw new IOException("WAL is closing, only marker edit is allowed"); + } long txid = stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, waitingConsumePayloads); if (shouldScheduleConsumer()) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java index 3967e78..bf5b96d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java @@ -18,14 +18,19 @@ package org.apache.hadoop.hbase.regionserver.wal; import java.io.IOException; +import java.io.InterruptedIOException; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter; import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.com.google.common.io.Closeables; import org.apache.hbase.thirdparty.io.netty.channel.Channel; import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; @@ -35,20 +40,24 @@ import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; @InterfaceAudience.Private public class DualAsyncFSWAL extends AsyncFSWAL { + private static final Logger LOG = LoggerFactory.getLogger(DualAsyncFSWAL.class); + private final FileSystem remoteFs; - private final Path remoteWalDir; + private final Path remoteWALDir; + + private volatile boolean skipRemoteWAL = false; - private volatile boolean skipRemoteWal = false; + private volatile boolean markerEditOnly = false; - public DualAsyncFSWAL(FileSystem fs, FileSystem remoteFs, Path rootDir, Path remoteWalDir, + public DualAsyncFSWAL(FileSystem fs, FileSystem remoteFs, Path rootDir, Path remoteWALDir, String logDir, String archiveDir, Configuration conf, List listeners, boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup, Class channelClass) throws FailedLogCloseException, IOException { super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix, - eventLoopGroup, channelClass); + eventLoopGroup, channelClass); this.remoteFs = remoteFs; - this.remoteWalDir = remoteWalDir; + this.remoteWALDir = remoteWALDir; } // will be overridden in testcase @@ -61,20 +70,37 @@ public class DualAsyncFSWAL extends AsyncFSWAL { @Override protected AsyncWriter createWriterInstance(Path path) throws IOException { AsyncWriter localWriter = super.createWriterInstance(path); - if (skipRemoteWal) { - return localWriter; - } - AsyncWriter remoteWriter; - boolean succ = false; - try { - remoteWriter = createAsyncWriter(remoteFs, new Path(remoteWalDir, path.getName())); - succ = true; - } finally { - if (!succ) { - closeWriter(localWriter); + // retry forever if we can not create the remote writer to prevent aborting the RS due to log + // rolling error, unless the skipRemoteWal is set to true. + // TODO: since for now we only have one thread doing log rolling, this may block the rolling for + // other wals + Path remoteWAL = new Path(remoteWALDir, path.getName()); + for (int retry = 0;; retry++) { + if (skipRemoteWAL) { + return localWriter; + } + AsyncWriter remoteWriter; + try { + remoteWriter = createAsyncWriter(remoteFs, remoteWAL); + } catch (IOException e) { + LOG.warn("create remote writer {} failed, retry = {}", remoteWAL, retry, e); + try { + Thread.sleep(ConnectionUtils.getPauseTime(100, retry)); + } catch (InterruptedException ie) { + // restore the interrupt state + Thread.currentThread().interrupt(); + Closeables.close(localWriter, true); + throw (IOException) new InterruptedIOException().initCause(ie); + } + continue; } + return createCombinedAsyncWriter(localWriter, remoteWriter); } - return createCombinedAsyncWriter(localWriter, remoteWriter); + } + + @Override + protected boolean markerEditOnly() { + return markerEditOnly; } // Allow temporarily skipping the creation of remote writer. When failing to write to the remote @@ -82,7 +108,14 @@ public class DualAsyncFSWAL extends AsyncFSWAL { // need to write a close marker when closing a region, and if it fails, the whole rs will abort. // So here we need to skip the creation of remote writer and make it possible to write the region // close marker. - public void skipRemoteWal() { - this.skipRemoteWal = true; + // Setting markerEdit only to true is for transiting from A to S, where we need to give up writing + // any pending wal entries as they will be discarded. The remote cluster will replicated the + // correct data back later. We still need to allow writing marker edits such as close region event + // to allow closing a region. + public void skipRemoteWAL(boolean markerEditOnly) { + if (markerEditOnly) { + this.markerEditOnly = true; + } + this.skipRemoteWAL = true; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java index 3cf065c..7e0adb6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java @@ -101,7 +101,7 @@ public class ReplaySyncReplicationWALCallable implements RSProcedureCallable { @Override public void init(byte[] parameter, HRegionServer rs) { this.rs = rs; - this.fs = rs.getWALFileSystem(); + this.fs = rs.getWALFileSystem().getBackingFs(); this.conf = rs.getConfiguration(); try { ReplaySyncReplicationWALParameter param = diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java index 82f8a89..b9fffcf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java @@ -291,7 +291,7 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen try { Optional opt = peerId2WAL.get(peerId); if (opt != null) { - opt.ifPresent(DualAsyncFSWAL::skipRemoteWal); + opt.ifPresent(w -> w.skipRemoteWAL(to == SyncReplicationState.STANDBY)); } else { // add a place holder to tell the getWAL caller do not use DualAsyncFSWAL any more. peerId2WAL.put(peerId, Optional.empty()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DualAsyncFSWALForTest.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DualAsyncFSWALForTest.java index fb3daf2..62000b4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DualAsyncFSWALForTest.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DualAsyncFSWALForTest.java @@ -97,11 +97,11 @@ class DualAsyncFSWALForTest extends DualAsyncFSWAL { } } - public DualAsyncFSWALForTest(FileSystem fs, FileSystem remoteFs, Path rootDir, Path remoteWalDir, + public DualAsyncFSWALForTest(FileSystem fs, FileSystem remoteFs, Path rootDir, Path remoteWALDir, String logDir, String archiveDir, Configuration conf, List listeners, boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup, Class channelClass) throws FailedLogCloseException, IOException { - super(fs, remoteFs, rootDir, remoteWalDir, logDir, archiveDir, conf, listeners, failIfWALExists, + super(fs, remoteFs, rootDir, remoteWALDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix, eventLoopGroup, channelClass); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java index fce0cdf..42adab6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java @@ -17,13 +17,28 @@ */ package org.apache.hadoop.hbase.replication; +import static org.hamcrest.CoreMatchers.containsString; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.client.AsyncConnection; +import org.apache.hadoop.hbase.client.AsyncTable; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Reader; import org.apache.hadoop.hbase.wal.WALFactory; @@ -66,8 +81,27 @@ public class TestSyncReplicationActive extends SyncReplicationTestBase { // confirm that the data is there after we convert the peer to DA verify(UTIL2, 0, 100); - UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, - SyncReplicationState.STANDBY); + try (AsyncConnection conn = + ConnectionFactory.createAsyncConnection(UTIL1.getConfiguration()).get()) { + AsyncTable table = conn.getTableBuilder(TABLE_NAME).setMaxAttempts(1).build(); + CompletableFuture future = + table.put(new Put(Bytes.toBytes(1000)).addColumn(CF, CQ, Bytes.toBytes(1000))); + Thread.sleep(2000); + // should hang on rolling + assertFalse(future.isDone()); + UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.STANDBY); + try { + future.get(); + fail("should fail because of the wal is closing"); + } catch (ExecutionException e) { + // expected + assertThat(e.getCause().getMessage(), containsString("only marker edit is allowed")); + } + } + // confirm that the data has not been persisted + HRegion region = UTIL1.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0); + assertTrue(region.get(new Get(Bytes.toBytes(1000))).isEmpty()); UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, SyncReplicationState.ACTIVE); @@ -89,8 +123,8 @@ public class TestSyncReplicationActive extends SyncReplicationTestBase { FileStatus[] files = fs2.listStatus(new Path(remoteDir, peerId)); Assert.assertTrue(files.length > 0); for (FileStatus file : files) { - try (Reader reader = - WALFactory.createReader(fs2, file.getPath(), utility.getConfiguration())) { + try ( + Reader reader = WALFactory.createReader(fs2, file.getPath(), utility.getConfiguration())) { Entry entry = reader.next(); Assert.assertTrue(entry != null); while (entry != null) { -- 2.7.4