From 96bfe4403e47ce9b7c2a656186692e6e4792b3a7 Mon Sep 17 00:00:00 2001 From: Wellington Chevreuil Date: Thu, 8 Aug 2019 12:19:09 +0100 Subject: [PATCH] HBASE-22784 OldWALs not cleared in a replication slave cluster (cyclic replication bw 2 clusters) --- .../ReplicationSourceWALReaderThread.java | 8 ++++ .../regionserver/TestWALEntryStream.java | 39 +++++++++++++++++++ 2 files changed, 47 insertions(+) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java index ec5e862985..3b3c0dbc57 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java @@ -81,6 +81,8 @@ public class ReplicationSourceWALReaderThread extends Thread { private AtomicLong totalBufferUsed; private long totalBufferQuota; + private ReplicationSourceManager replicationSourceManager; + /** * Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the * entries, and puts them on a batch queue. @@ -109,6 +111,7 @@ public class ReplicationSourceWALReaderThread extends Thread { // memory used will be batchSizeCapacity * (nb.batches + 1) // the +1 is for the current thread reading before placing onto the queue int batchCount = conf.getInt("replication.source.nb.batches", 1); + this.replicationSourceManager = manager; this.totalBufferUsed = manager.getTotalBufferUsed(); this.totalBufferQuota = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT); @@ -156,6 +159,11 @@ public class ReplicationSourceWALReaderThread extends Thread { break; } } + } else { + replicationSourceManager.logPositionAndCleanOldLogs(logQueue.peek(), + this.replicationQueueInfo.getPeerClusterZnode(), + entryStream.getPosition(), + this.replicationQueueInfo.isQueueRecovered(), false); } } if (batch != null && (!batch.getLastSeqIds().isEmpty() || batch.getNbEntries() > 0)) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java index a409fae03d..df886dffb8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java @@ -25,6 +25,11 @@ import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.IOException; @@ -72,6 +77,7 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import org.mockito.runners.MockitoJUnitRunner; @@ -371,6 +377,39 @@ public class TestWALEntryStream { assertEquals(getRow(entryBatch.getWalEntries().get(0)), "foo"); } + @Test + public void testReplicationSourceUpdatesLogPositionOnFilteredEntries() throws Exception { + appendEntriesToLog(3); + // get ending position + long position; + try (WALEntryStream entryStream = + new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) { + entryStream.next(); + entryStream.next(); + entryStream.next(); + position = entryStream.getPosition(); + } + // start up a readerThread with a WALEntryFilter that always filter the entries + ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class); + when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); + ReplicationSourceWALReaderThread readerThread = new ReplicationSourceWALReaderThread( + mockSourceManager, getQueueInfo(), walQueue, 0, fs, conf, new WALEntryFilter() { + @Override public Entry filter(Entry entry) { + return null; + } + }, new MetricsSource("1")); + readerThread.start(); + Thread.sleep(100); + ArgumentCaptor positionCaptor = ArgumentCaptor.forClass(Long.class); + verify(mockSourceManager, times(3)) + .logPositionAndCleanOldLogs(any(Path.class), + anyString(), + positionCaptor.capture(), + anyBoolean(), + anyBoolean()); + assertEquals(position, positionCaptor.getValue().longValue()); + } + @Test public void testWALKeySerialization() throws Exception { Map attributes = new HashMap(); -- 2.17.2 (Apple Git-113)