From 9c4d92bb752dcbfea9736dff79efc292b8adad25 Mon Sep 17 00:00:00 2001 From: Wellington Chevreuil Date: Tue, 20 Nov 2018 23:40:47 +0000 Subject: [PATCH] HBASE-21503 - Replication normal source can get stuck due potential race conditions between source wal reader and wal provider initialization threads. --- .../ReplicationSourceManager.java | 3 ++ .../regionserver/WALEntryStream.java | 1 + .../hbase/wal/AbstractFSWALProvider.java | 9 ++-- .../hbase/wal/TestAsyncFSWalProvider.java | 54 +++++++++++++++++++ 4 files changed, 63 insertions(+), 4 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestAsyncFSWalProvider.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 20c1215950..4afcee3407 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -381,6 +381,7 @@ public class ReplicationSourceManager implements ReplicationListener { abortAndThrowIOExceptionWhenFail( () -> this.queueStorage.addWAL(server.getServerName(), peerId, walPath.getName())); src.enqueueLog(walPath); + LOG.trace("Enqueued {} to source {} during source creation.", walPath, src.getQueueId()); } } } @@ -789,6 +790,8 @@ public class ReplicationSourceManager implements ReplicationListener { // This only updates the sources we own, not the recovered ones for (ReplicationSourceInterface source : this.sources.values()) { source.enqueueLog(newLog); + LOG.trace("Enqueued {} to source {} while performing postLogRoll operation.", + newLog, source.getQueueId()); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java index 0393af4970..cb25629768 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java @@ -173,6 +173,7 @@ class WALEntryStream implements Closeable { private void tryAdvanceEntry() throws IOException { if (checkReader()) { boolean beingWritten = readNextEntryAndRecordReaderPosition(); + LOG.trace("reading wal file {}. Current open for write: {}", this.currentPath, beingWritten); if (currentEntry == null && !beingWritten) { // no more entries in this log file, and the file is already closed, i.e, rolled // Before dequeueing, we should always get one more attempt at reading. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java index ccdc95f17a..b63a52bcae 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java @@ -119,11 +119,12 @@ public abstract class AbstractFSWALProvider> implemen @Override public List getWALs() { - if (wal == null) { - return Collections.emptyList(); - } List wals = new ArrayList<>(1); - wals.add(wal); + try { + wals.add(this.getWAL(null)); + } catch (IOException e) { + LOG.trace("returning empty wal list due: ", e); + } return wals; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestAsyncFSWalProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestAsyncFSWalProvider.java new file mode 100644 index 0000000000..e6e8d42f3a --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestAsyncFSWalProvider.java @@ -0,0 +1,54 @@ +package org.apache.hadoop.hbase.wal; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +@Category({ RegionServerTests.class, SmallTests.class}) +public class TestAsyncFSWalProvider { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestAsyncFSWalProvider.class); + + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final Logger LOG = LoggerFactory.getLogger(TestFSHLogProvider.class); + + private static Configuration conf; + private static FileSystem fs; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // Make block sizes small. + TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024); + TEST_UTIL.startMiniDFSCluster(3); + + // Set up a working space for our tests. + TEST_UTIL.createRootDir(); + conf = TEST_UTIL.getConfiguration(); + fs = TEST_UTIL.getDFSCluster().getFileSystem(); + } + + @Test + public void testGetWALsNotEmptyOnceInit() throws IOException { + AsyncFSWALProvider provider = new AsyncFSWALProvider(); + WALFactory mockWALFactory = Mockito.mock(WALFactory.class); + provider.init(mockWALFactory, TEST_UTIL.getConfiguration(), "testGetWALsNotEmptyOnceInit"); + Assert.assertTrue(provider.getWALs().size()>0); + } + +} -- 2.17.2 (Apple Git-113)