From 2c1f2900fbf6f12ec8a7dafaff88c06a8c89b8fc Mon Sep 17 00:00:00 2001 From: zhangduo Date: Mon, 28 May 2018 18:35:27 +0800 Subject: [PATCH] HBASE-20637 Polish the WAL switching when transiting from A to S --- .../hadoop/hbase/regionserver/wal/AsyncFSWAL.java | 50 +++++++++++++++ .../hbase/regionserver/wal/DualAsyncFSWAL.java | 71 ++++++++++++++++------ .../hbase/wal/SyncReplicationWALProvider.java | 2 +- .../hbase/replication/DualAsyncFSWALForTest.java | 4 +- 4 files changed, 105 insertions(+), 22 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index 9b4ce9c..5aed67a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -470,6 +470,44 @@ public class AsyncFSWAL extends AbstractFSWAL { // whether to issue a sync in the caller method. } + private void drainNonMarkerEditsAndFailSyncs() { + if (toWriteAppends.isEmpty()) { + return; + } + boolean hasNonMarkerEdits = false; + Iterator iter = toWriteAppends.descendingIterator(); + while (iter.hasNext()) { + FSWALEntry entry = iter.next(); + if (!entry.getEdit().isMetaEdit()) { + hasNonMarkerEdits = true; + break; + } + } + if (hasNonMarkerEdits) { + for (;;) { + iter.remove(); + if (!iter.hasNext()) { + break; + } + iter.next(); + } + unackedAppends.clear(); + // fail the sync futures which are under the txid of the first remaining edit, if none, fail + // all the sync futures. + long txid = toWriteAppends.isEmpty() ? Long.MAX_VALUE : toWriteAppends.peek().getTxid(); + IOException error = new IOException("WAL is closing, only marker edit is allowed"); + for (Iterator syncIter = syncFutures.iterator(); syncIter.hasNext();) { + SyncFuture future = syncIter.next(); + if (future.getTxid() < txid) { + future.done(future.getTxid(), error); + iter.remove(); + } else { + break; + } + } + } + } + private void consume() { consumeLock.lock(); try { @@ -512,6 +550,9 @@ public class AsyncFSWAL extends AbstractFSWAL { } waitingConsumePayloadsGatingSequence.set(nextCursor); } + if (markerEditOnly()) { + drainNonMarkerEditsAndFailSyncs(); + } appendAndSync(); if (hasConsumerTask.get()) { return; @@ -553,9 +594,18 @@ public class AsyncFSWAL extends AbstractFSWAL { return consumerScheduled.compareAndSet(false, true); } + // This is used by sync replication, where we are going to close the wal soon after we reopen all + // the regions. Will be overridden by sub classes. + protected boolean markerEditOnly() { + return false; + } + @Override public long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore) throws IOException { + if (markerEditOnly() && !edits.isMetaEdit()) { + throw new IOException("WAL is closing, only marker edit is allowed"); + } long txid = stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, waitingConsumePayloads); if (shouldScheduleConsumer()) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java index 3967e78..bf5b96d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java @@ -18,14 +18,19 @@ package org.apache.hadoop.hbase.regionserver.wal; import java.io.IOException; +import java.io.InterruptedIOException; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter; import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.com.google.common.io.Closeables; import org.apache.hbase.thirdparty.io.netty.channel.Channel; import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; @@ -35,20 +40,24 @@ import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; @InterfaceAudience.Private public class DualAsyncFSWAL extends AsyncFSWAL { + private static final Logger LOG = LoggerFactory.getLogger(DualAsyncFSWAL.class); + private final FileSystem remoteFs; - private final Path remoteWalDir; + private final Path remoteWALDir; + + private volatile boolean skipRemoteWAL = false; - private volatile boolean skipRemoteWal = false; + private volatile boolean markerEditOnly = false; - public DualAsyncFSWAL(FileSystem fs, FileSystem remoteFs, Path rootDir, Path remoteWalDir, + public DualAsyncFSWAL(FileSystem fs, FileSystem remoteFs, Path rootDir, Path remoteWALDir, String logDir, String archiveDir, Configuration conf, List listeners, boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup, Class channelClass) throws FailedLogCloseException, IOException { super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix, - eventLoopGroup, channelClass); + eventLoopGroup, channelClass); this.remoteFs = remoteFs; - this.remoteWalDir = remoteWalDir; + this.remoteWALDir = remoteWALDir; } // will be overridden in testcase @@ -61,20 +70,37 @@ public class DualAsyncFSWAL extends AsyncFSWAL { @Override protected AsyncWriter createWriterInstance(Path path) throws IOException { AsyncWriter localWriter = super.createWriterInstance(path); - if (skipRemoteWal) { - return localWriter; - } - AsyncWriter remoteWriter; - boolean succ = false; - try { - remoteWriter = createAsyncWriter(remoteFs, new Path(remoteWalDir, path.getName())); - succ = true; - } finally { - if (!succ) { - closeWriter(localWriter); + // retry forever if we can not create the remote writer to prevent aborting the RS due to log + // rolling error, unless the skipRemoteWal is set to true. + // TODO: since for now we only have one thread doing log rolling, this may block the rolling for + // other wals + Path remoteWAL = new Path(remoteWALDir, path.getName()); + for (int retry = 0;; retry++) { + if (skipRemoteWAL) { + return localWriter; + } + AsyncWriter remoteWriter; + try { + remoteWriter = createAsyncWriter(remoteFs, remoteWAL); + } catch (IOException e) { + LOG.warn("create remote writer {} failed, retry = {}", remoteWAL, retry, e); + try { + Thread.sleep(ConnectionUtils.getPauseTime(100, retry)); + } catch (InterruptedException ie) { + // restore the interrupt state + Thread.currentThread().interrupt(); + Closeables.close(localWriter, true); + throw (IOException) new InterruptedIOException().initCause(ie); + } + continue; } + return createCombinedAsyncWriter(localWriter, remoteWriter); } - return createCombinedAsyncWriter(localWriter, remoteWriter); + } + + @Override + protected boolean markerEditOnly() { + return markerEditOnly; } // Allow temporarily skipping the creation of remote writer. When failing to write to the remote @@ -82,7 +108,14 @@ public class DualAsyncFSWAL extends AsyncFSWAL { // need to write a close marker when closing a region, and if it fails, the whole rs will abort. // So here we need to skip the creation of remote writer and make it possible to write the region // close marker. - public void skipRemoteWal() { - this.skipRemoteWal = true; + // Setting markerEdit only to true is for transiting from A to S, where we need to give up writing + // any pending wal entries as they will be discarded. The remote cluster will replicated the + // correct data back later. We still need to allow writing marker edits such as close region event + // to allow closing a region. + public void skipRemoteWAL(boolean markerEditOnly) { + if (markerEditOnly) { + this.markerEditOnly = true; + } + this.skipRemoteWAL = true; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java index 82f8a89..b9fffcf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java @@ -291,7 +291,7 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen try { Optional opt = peerId2WAL.get(peerId); if (opt != null) { - opt.ifPresent(DualAsyncFSWAL::skipRemoteWal); + opt.ifPresent(w -> w.skipRemoteWAL(to == SyncReplicationState.STANDBY)); } else { // add a place holder to tell the getWAL caller do not use DualAsyncFSWAL any more. peerId2WAL.put(peerId, Optional.empty()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DualAsyncFSWALForTest.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DualAsyncFSWALForTest.java index fb3daf2..62000b4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DualAsyncFSWALForTest.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DualAsyncFSWALForTest.java @@ -97,11 +97,11 @@ class DualAsyncFSWALForTest extends DualAsyncFSWAL { } } - public DualAsyncFSWALForTest(FileSystem fs, FileSystem remoteFs, Path rootDir, Path remoteWalDir, + public DualAsyncFSWALForTest(FileSystem fs, FileSystem remoteFs, Path rootDir, Path remoteWALDir, String logDir, String archiveDir, Configuration conf, List listeners, boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup, Class channelClass) throws FailedLogCloseException, IOException { - super(fs, remoteFs, rootDir, remoteWalDir, logDir, archiveDir, conf, listeners, failIfWALExists, + super(fs, remoteFs, rootDir, remoteWALDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix, eventLoopGroup, channelClass); } -- 2.7.4