From 8bbad72c8fd156664d80aa60ce4da49dd9f7b389 Mon Sep 17 00:00:00 2001 From: Wellington Chevreuil Date: Thu, 8 Aug 2019 12:19:09 +0100 Subject: [PATCH] HBASE-22784 OldWALs not cleared in a replication slave cluster (cyclic replication bw 2 clusters) --- .../regionserver/ReplicationSource.java | 1 + .../ReplicationSourceManager.java | 30 +++++++++----- .../ReplicationSourceWALReaderThread.java | 9 +++++ .../regionserver/TestWALEntryStream.java | 39 +++++++++++++++++++ 4 files changed, 69 insertions(+), 10 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index ad941e0fb4..c7780bb32c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -783,6 +783,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf } private void updateLogPosition(long lastReadPosition) { + manager.setPendingShipment(false); manager.logPositionAndCleanOldLogs(currentPath, peerClusterZnode, lastReadPosition, this.replicationQueueInfo.isQueueRecovered(), false); lastLoggedPosition = lastReadPosition; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index dbe9e63e01..6b8b6e273c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -121,9 +121,10 @@ public class ReplicationSourceManager implements ReplicationListener { private final Random rand; private final boolean replicationForBulkLoadDataEnabled; - private AtomicLong totalBufferUsed = new AtomicLong(); + private boolean pendingShipment; + /** * Creates a replication manager and sets the watch on all the other registered region servers * @param replicationQueues the interface for manipulating replication queues @@ -189,14 +190,20 @@ public class ReplicationSourceManager implements ReplicationListener { * @param queueRecovered indicates if this queue comes from another region server * @param holdLogInZK if true then the log is retained in ZK */ - public void logPositionAndCleanOldLogs(Path log, String id, long position, + public synchronized void logPositionAndCleanOldLogs(Path log, String id, long position, boolean queueRecovered, boolean holdLogInZK) { - String fileName = log.getName(); - this.replicationQueues.setLogPosition(id, fileName, position); - if (holdLogInZK) { - return; + if (!this.pendingShipment) { + String fileName = log.getName(); + this.replicationQueues.setLogPosition(id, fileName, position); + if (holdLogInZK) { + return; + } + cleanOldLogs(fileName, id, queueRecovered); } - cleanOldLogs(fileName, id, queueRecovered); + } + + public synchronized void setPendingShipment(boolean pendingShipment) { + this.pendingShipment = pendingShipment; } /** @@ -209,9 +216,12 @@ public class ReplicationSourceManager implements ReplicationListener { public void cleanOldLogs(String key, String id, boolean queueRecovered) { String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(key); if (queueRecovered) { - SortedSet wals = walsByIdRecoveredQueues.get(id).get(logPrefix); - if (wals != null && !wals.first().equals(key)) { - cleanOldLogs(wals, key, id); + Map> walsForPeer = walsByIdRecoveredQueues.get(id); + if(walsForPeer != null) { + SortedSet wals = walsForPeer.get(logPrefix); + if (wals != null && !wals.first().equals(key)) { + cleanOldLogs(wals, key, id); + } } } else { synchronized (this.walsById) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java index ec5e862985..1d94a7a2c8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java @@ -81,6 +81,8 @@ public class ReplicationSourceWALReaderThread extends Thread { private AtomicLong totalBufferUsed; private long totalBufferQuota; + private ReplicationSourceManager replicationSourceManager; + /** * Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the * entries, and puts them on a batch queue. @@ -109,6 +111,7 @@ public class ReplicationSourceWALReaderThread extends Thread { // memory used will be batchSizeCapacity * (nb.batches + 1) // the +1 is for the current thread reading before placing onto the queue int batchCount = conf.getInt("replication.source.nb.batches", 1); + this.replicationSourceManager = manager; this.totalBufferUsed = manager.getTotalBufferUsed(); this.totalBufferQuota = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT); @@ -148,6 +151,7 @@ public class ReplicationSourceWALReaderThread extends Thread { long entrySize = getEntrySizeIncludeBulkLoad(entry); long entrySizeExlucdeBulkLoad = getEntrySizeExcludeBulkLoad(entry); batch.addEntry(entry); + replicationSourceManager.setPendingShipment(true); updateBatchStats(batch, entry, entryStream.getPosition(), entrySize); boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExlucdeBulkLoad); // Stop if too many entries or too big @@ -156,6 +160,11 @@ public class ReplicationSourceWALReaderThread extends Thread { break; } } + } else { + replicationSourceManager.logPositionAndCleanOldLogs(entryStream.getCurrentPath(), + this.replicationQueueInfo.getPeerClusterZnode(), + entryStream.getPosition(), + this.replicationQueueInfo.isQueueRecovered(), false); } } if (batch != null && (!batch.getLastSeqIds().isEmpty() || batch.getNbEntries() > 0)) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java index a409fae03d..df886dffb8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java @@ -25,6 +25,11 @@ import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.IOException; @@ -72,6 +77,7 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import org.mockito.runners.MockitoJUnitRunner; @@ -371,6 +377,39 @@ public class TestWALEntryStream { assertEquals(getRow(entryBatch.getWalEntries().get(0)), "foo"); } + @Test + public void testReplicationSourceUpdatesLogPositionOnFilteredEntries() throws Exception { + appendEntriesToLog(3); + // get ending position + long position; + try (WALEntryStream entryStream = + new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) { + entryStream.next(); + entryStream.next(); + entryStream.next(); + position = entryStream.getPosition(); + } + // start up a readerThread with a WALEntryFilter that always filter the entries + ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class); + when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); + ReplicationSourceWALReaderThread readerThread = new ReplicationSourceWALReaderThread( + mockSourceManager, getQueueInfo(), walQueue, 0, fs, conf, new WALEntryFilter() { + @Override public Entry filter(Entry entry) { + return null; + } + }, new MetricsSource("1")); + readerThread.start(); + Thread.sleep(100); + ArgumentCaptor positionCaptor = ArgumentCaptor.forClass(Long.class); + verify(mockSourceManager, times(3)) + .logPositionAndCleanOldLogs(any(Path.class), + anyString(), + positionCaptor.capture(), + anyBoolean(), + anyBoolean()); + assertEquals(position, positionCaptor.getValue().longValue()); + } + @Test public void testWALKeySerialization() throws Exception { Map attributes = new HashMap(); -- 2.17.2 (Apple Git-113)