From 9f2ab0892bb127e4450463cf82fa24137e5a9bea Mon Sep 17 00:00:00 2001 From: zhangduo Date: Fri, 16 Mar 2018 22:02:25 +0800 Subject: [PATCH] HBASE-20206 WALEntryStream should not switch WAL file silently --- .../regionserver/RecoveredReplicationSource.java | 33 ---- .../RecoveredReplicationSourceShipper.java | 11 +- .../regionserver/ReplicationSource.java | 2 +- .../regionserver/ReplicationSourceShipper.java | 64 +++---- .../regionserver/ReplicationSourceWALReader.java | 48 +++++- .../SerialReplicationSourceWALReader.java | 29 +++- .../replication/regionserver/WALEntryBatch.java | 21 +++ .../replication/regionserver/WALEntryStream.java | 2 +- .../TestReplicationEmptyWALRecovery.java | 2 +- .../regionserver/TestWALEntryStream.java | 188 ++++++++++++++++----- 10 files changed, 273 insertions(+), 127 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java index 169b469..f1ad99d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; import java.util.List; import java.util.UUID; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.PriorityBlockingQueue; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -64,38 +63,6 @@ public class RecoveredReplicationSource extends ReplicationSource { return new RecoveredReplicationSourceShipper(conf, walGroupId, queue, this, queueStorage); } - private void handleEmptyWALEntryBatch0(ReplicationSourceWALReader reader, - BlockingQueue entryBatchQueue, Path currentPath) throws InterruptedException { - LOG.trace("Didn't read any new entries from WAL"); - // we're done with queue recovery, shut ourself down - reader.setReaderRunning(false); - // shuts down shipper thread immediately - entryBatchQueue.put(new WALEntryBatch(0, currentPath)); - } - - @Override - protected ReplicationSourceWALReader createNewWALReader(String walGroupId, - PriorityBlockingQueue queue, long startPosition) { - if (replicationPeer.getPeerConfig().isSerial()) { - return new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, - this) { - - @Override - protected void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException { - handleEmptyWALEntryBatch0(this, entryBatchQueue, currentPath); - } - }; - } else { - return new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this) { - - @Override - protected void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException { - handleEmptyWALEntryBatch0(this, entryBatchQueue, currentPath); - } - }; - } - } - public void locateRecoveredPaths(PriorityBlockingQueue queue) throws IOException { boolean hasPathChanged = false; PriorityBlockingQueue newPaths = diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java index 1ae5cb9..b04f338 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java @@ -48,13 +48,10 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper } @Override - protected void postShipEdits(WALEntryBatch entryBatch) { - if (entryBatch.getWalEntries().isEmpty()) { - LOG.debug("Finished recovering queue for group " + walGroupId + " of peer " - + source.getQueueId()); - source.getSourceMetrics().incrCompletedRecoveryQueue(); - setWorkerState(WorkerState.FINISHED); - } + protected void noMoreData() { + LOG.debug("Finished recovering queue for group {} of peer {}", walGroupId, source.getQueueId()); + source.getSourceMetrics().incrCompletedRecoveryQueue(); + setWorkerState(WorkerState.FINISHED); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 3480919..236c575 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -315,7 +315,7 @@ public class ReplicationSource implements ReplicationSourceInterface { return new ReplicationSourceShipper(conf, walGroupId, queue, this); } - protected ReplicationSourceWALReader createNewWALReader(String walGroupId, + private ReplicationSourceWALReader createNewWALReader(String walGroupId, PriorityBlockingQueue queue, long startPosition) { return replicationPeer.getPeerConfig().isSerial() ? new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index aa5251e..b5713f3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; import java.util.List; -import java.util.Map; import java.util.concurrent.PriorityBlockingQueue; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -52,15 +51,15 @@ public class ReplicationSourceShipper extends Thread { FINISHED, // The worker is done processing a recovered queue } - protected final Configuration conf; + private final Configuration conf; protected final String walGroupId; protected final PriorityBlockingQueue queue; - protected final ReplicationSourceInterface source; + private final ReplicationSourceInterface source; // Last position in the log that we sent to ZooKeeper - protected long lastLoggedPosition = -1; + private long currentPosition = -1; // Path of the current log - protected volatile Path currentPath; + private Path currentPath; // Current state of the worker thread private WorkerState state; protected ReplicationSourceWALReader entryReader; @@ -97,8 +96,12 @@ public class ReplicationSourceShipper extends Thread { } try { WALEntryBatch entryBatch = entryReader.take(); - shipEdits(entryBatch); - postShipEdits(entryBatch); + // the NO_MORE_DATA instance has no path so do not all shipEdits + if (entryBatch == WALEntryBatch.NO_MORE_DATA) { + noMoreData(); + } else { + shipEdits(entryBatch); + } } catch (InterruptedException e) { LOG.trace("Interrupted while waiting for next replication entry batch", e); Thread.currentThread().interrupt(); @@ -113,7 +116,7 @@ public class ReplicationSourceShipper extends Thread { } // To be implemented by recovered shipper - protected void postShipEdits(WALEntryBatch entryBatch) { + protected void noMoreData() { } // To be implemented by recovered shipper @@ -123,14 +126,11 @@ public class ReplicationSourceShipper extends Thread { /** * Do the shipping logic */ - protected final void shipEdits(WALEntryBatch entryBatch) { + private void shipEdits(WALEntryBatch entryBatch) { List entries = entryBatch.getWalEntries(); - long lastReadPosition = entryBatch.getLastWalPosition(); - currentPath = entryBatch.getLastWalPath(); int sleepMultiplier = 0; if (entries.isEmpty()) { - if (lastLoggedPosition != lastReadPosition) { - updateLogPosition(lastReadPosition, entryBatch.getLastSeqIds()); + if (updateLogPosition(entryBatch)) { // if there was nothing to ship and it's not an error // set "ageOfLastShippedOp" to to indicate that we're current source.getSourceMetrics().setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), @@ -168,16 +168,12 @@ public class ReplicationSourceShipper extends Thread { } else { sleepMultiplier = Math.max(sleepMultiplier - 1, 0); } - - if (this.lastLoggedPosition != lastReadPosition) { - // Clean up hfile references - int size = entries.size(); - for (int i = 0; i < size; i++) { - cleanUpHFileRefs(entries.get(i).getEdit()); - } - // Log and clean up WAL logs - updateLogPosition(lastReadPosition, entryBatch.getLastSeqIds()); + // Clean up hfile references + for (Entry entry : entries) { + cleanUpHFileRefs(entry.getEdit()); } + // Log and clean up WAL logs + updateLogPosition(entryBatch); source.postShipEdits(entries, currentSize); // FIXME check relationship between wal group and overall @@ -224,10 +220,20 @@ public class ReplicationSourceShipper extends Thread { } } - private void updateLogPosition(long lastReadPosition, Map lastSeqIds) { - source.getSourceManager().logPositionAndCleanOldLogs(currentPath, source.getQueueId(), - lastReadPosition, lastSeqIds, source.isRecovered()); - lastLoggedPosition = lastReadPosition; + private boolean updateLogPosition(WALEntryBatch batch) { + boolean updated = false; + // if end of file is true, then the logPositionAndCleanOldLogs method will remove the file + // record on zk, so let's call it. + if (batch.isEndOfFile() || !batch.getLastWalPath().equals(currentPath) || + batch.getLastWalPosition() != currentPosition) { + source.getSourceManager().logPositionAndCleanOldLogs(batch.getLastWalPath(), + source.getQueueId(), batch.getLastWalPosition(), batch.getLastSeqIds(), + source.isRecovered()); + updated = true; + } + currentPath = batch.getLastWalPath(); + currentPosition = batch.getLastWalPosition(); + return updated; } public void startup(UncaughtExceptionHandler handler) { @@ -241,11 +247,11 @@ public class ReplicationSourceShipper extends Thread { } public Path getCurrentPath() { - return this.entryReader.getCurrentPath(); + return entryReader.getCurrentPath(); } public long getCurrentPosition() { - return this.lastLoggedPosition; + return currentPosition; } public void setWALReader(ReplicationSourceWALReader entryReader) { @@ -256,7 +262,7 @@ public class ReplicationSourceShipper extends Thread { return 0; } - protected final boolean isActive() { + private boolean isActive() { return source.isSourceActive() && state == WorkerState.RUNNING && !isInterrupted(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index b125133..afe5157 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -130,6 +130,7 @@ class ReplicationSourceWALReader extends Thread { continue; } WALEntryBatch batch = readWALEntries(entryStream); + currentPosition = entryStream.getPosition(); if (batch != null) { // need to propagate the batch even it has no entries since it may carry the last // sequence id information for serial replication. @@ -138,9 +139,8 @@ class ReplicationSourceWALReader extends Thread { sleepMultiplier = 1; } else { // got no entries and didn't advance position in WAL handleEmptyWALEntryBatch(entryStream.getCurrentPath()); + entryStream.reset(); // reuse stream } - currentPosition = entryStream.getPosition(); - entryStream.reset(); // reuse stream } } catch (IOException e) { // stream related if (sleepMultiplier < maxRetriesMultiplier) { @@ -173,13 +173,31 @@ class ReplicationSourceWALReader extends Thread { batch.getNbEntries() >= replicationBatchCountCapacity; } + protected static final boolean switched(WALEntryStream entryStream, Path path) { + return !entryStream.getCurrentPath().equals(path); + } + protected WALEntryBatch readWALEntries(WALEntryStream entryStream) throws IOException, InterruptedException { + Path currentPath = entryStream.getCurrentPath(); if (!entryStream.hasNext()) { - return null; + // check whether we have switched a file + if (currentPath != null && switched(entryStream, currentPath)) { + return WALEntryBatch.endOfFile(currentPath); + } else { + return null; + } + } + if (currentPath != null) { + if (switched(entryStream, currentPath)) { + return WALEntryBatch.endOfFile(currentPath); + } + } else { + // when reading from the entry stream first time we will enter here + currentPath = entryStream.getCurrentPath(); } WALEntryBatch batch = createBatch(entryStream); - do { + for (;;) { Entry entry = entryStream.next(); batch.setLastWalPosition(entryStream.getPosition()); entry = filterEntry(entry); @@ -188,13 +206,29 @@ class ReplicationSourceWALReader extends Thread { break; } } - } while (entryStream.hasNext()); + boolean hasNext = entryStream.hasNext(); + // always return if we have switched to a new file + if (switched(entryStream, currentPath)) { + batch.setEndOfFile(true); + break; + } + if (!hasNext) { + break; + } + } return batch; } - protected void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException { + private void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException { LOG.trace("Didn't read any new entries from WAL"); - Thread.sleep(sleepForRetries); + if (source.isRecovered()) { + // we're done with queue recovery, shut ourself down + setReaderRunning(false); + // shuts down shipper thread immediately + entryBatchQueue.put(WALEntryBatch.NO_MORE_DATA); + } else { + Thread.sleep(sleepForRetries); + } } // if we get an EOF due to a zero-length log, and there are other logs in queue diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java index 5e9a9f6..9edcc8a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java @@ -53,12 +53,26 @@ public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader @Override protected WALEntryBatch readWALEntries(WALEntryStream entryStream) throws IOException, InterruptedException { + Path currentPath = entryStream.getCurrentPath(); if (!entryStream.hasNext()) { - return null; + // check whether we have switched a file + if (currentPath != null && switched(entryStream, currentPath)) { + return WALEntryBatch.endOfFile(currentPath); + } else { + return null; + } + } + if (currentPath != null) { + if (switched(entryStream, currentPath)) { + return WALEntryBatch.endOfFile(currentPath); + } + } else { + // when reading from the entry stream first time we will enter here + currentPath = entryStream.getCurrentPath(); } long positionBefore = entryStream.getPosition(); WALEntryBatch batch = createBatch(entryStream); - do { + for (;;) { Entry entry = entryStream.peek(); boolean doFiltering = true; if (firstCellInEntryBeforeFiltering == null) { @@ -99,7 +113,16 @@ public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader // actually remove the entry. removeEntryFromStream(entryStream, batch); } - } while (entryStream.hasNext()); + boolean hasNext = entryStream.hasNext(); + // always return if we have switched to a new file. + if (switched(entryStream, currentPath)) { + batch.setEndOfFile(true); + break; + } + if (!hasNext) { + break; + } + } return batch; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java index 31c3ac7..39b44f0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java @@ -30,6 +30,10 @@ import org.apache.yetus.audience.InterfaceAudience; */ @InterfaceAudience.Private class WALEntryBatch { + + // used by recovered replication queue to indicate that all the entries have been read. + public static final WALEntryBatch NO_MORE_DATA = new WALEntryBatch(0, null); + private List walEntries; // last WAL that was read private Path lastWalPath; @@ -43,6 +47,8 @@ class WALEntryBatch { private long heapSize = 0; // save the last sequenceid for each region if the table has serial-replication scope private Map lastSeqIds = new HashMap<>(); + // indicate that this is the end of the current file + private boolean endOfFile; /** * @param lastWalPath Path of the WAL the last entry in this batch was read from @@ -52,6 +58,13 @@ class WALEntryBatch { this.lastWalPath = lastWalPath; } + + static WALEntryBatch endOfFile(Path lastWalPath) { + WALEntryBatch batch = new WALEntryBatch(0, lastWalPath); + batch.setEndOfFile(true); + return batch; + } + public void addEntry(Entry entry) { walEntries.add(entry); } @@ -120,6 +133,14 @@ class WALEntryBatch { return lastSeqIds; } + public boolean isEndOfFile() { + return endOfFile; + } + + public void setEndOfFile(boolean endOfFile) { + this.endOfFile = endOfFile; + } + public void incrementNbRowKeys(int increment) { nbRowKeys += increment; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java index c639a48..4e0e720 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java @@ -155,7 +155,6 @@ class WALEntryStream implements Closeable { /** * Should be called if the stream is to be reused (i.e. used again after hasNext() has returned * false) - * @throws IOException */ public void reset() throws IOException { if (reader != null && currentPath != null) { @@ -394,6 +393,7 @@ class WALEntryStream implements Closeable { private void resetReader() throws IOException { try { + currentEntry = null; reader.reset(); seek(); } catch (FileNotFoundException fnfe) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java index 4effe41..0f67990 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java @@ -55,7 +55,7 @@ public class TestReplicationEmptyWALRecovery extends TestReplicationBase { * @param numRs number of regionservers */ private void waitForLogAdvance(int numRs) throws Exception { - Waiter.waitFor(conf1, 10000, new Waiter.Predicate() { + Waiter.waitFor(conf1, 1000000, new Waiter.Predicate() { @Override public boolean evaluate() throws Exception { for (int i = 0; i < numRs; i++) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java index eb7d5a0..2670756 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; @@ -75,7 +76,7 @@ public class TestWALEntryStream { HBaseClassTestRule.forClass(TestWALEntryStream.class); private static HBaseTestingUtility TEST_UTIL; - private static Configuration conf; + private static Configuration CONF; private static FileSystem fs; private static MiniDFSCluster cluster; private static final TableName tableName = TableName.valueOf("tablename"); @@ -102,7 +103,7 @@ public class TestWALEntryStream { @BeforeClass public static void setUpBeforeClass() throws Exception { TEST_UTIL = new HBaseTestingUtility(); - conf = TEST_UTIL.getConfiguration(); + CONF = TEST_UTIL.getConfiguration(); TEST_UTIL.startMiniDFSCluster(3); cluster = TEST_UTIL.getDFSCluster(); @@ -118,7 +119,7 @@ public class TestWALEntryStream { public void setUp() throws Exception { walQueue = new PriorityBlockingQueue<>(); pathWatcher = new PathWatcher(); - final WALFactory wals = new WALFactory(conf, tn.getMethodName()); + final WALFactory wals = new WALFactory(CONF, tn.getMethodName()); wals.getWALProvider().addWALActionsListener(pathWatcher); log = wals.getWAL(info); } @@ -144,13 +145,13 @@ public class TestWALEntryStream { mvcc.advanceTo(1); for (int i = 0; i < nbRows; i++) { - appendToLogPlus(walEditKVs); + appendToLogAndSync(walEditKVs); } log.rollWriter(); try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) { + new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) { int i = 0; while (entryStream.hasNext()) { assertNotNull(entryStream.next()); @@ -174,10 +175,10 @@ public class TestWALEntryStream { */ @Test public void testAppendsWithRolls() throws Exception { - appendToLog(); + appendToLogAndSync(); long oldPos; try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) { + new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) { // There's one edit in the log, read it. Reading past it needs to throw exception assertTrue(entryStream.hasNext()); WAL.Entry entry = entryStream.peek(); @@ -189,9 +190,9 @@ public class TestWALEntryStream { oldPos = entryStream.getPosition(); } - appendToLog(); + appendToLogAndSync(); - try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, oldPos, + try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, CONF, oldPos, log, null, new MetricsSource("1"))) { // Read the newly added entry, make sure we made progress WAL.Entry entry = entryStream.next(); @@ -201,11 +202,11 @@ public class TestWALEntryStream { } // We rolled but we still should see the end of the first log and get that item - appendToLog(); + appendToLogAndSync(); log.rollWriter(); - appendToLog(); + appendToLogAndSync(); - try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, oldPos, + try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, CONF, oldPos, log, null, new MetricsSource("1"))) { WAL.Entry entry = entryStream.next(); assertNotEquals(oldPos, entryStream.getPosition()); @@ -231,7 +232,7 @@ public class TestWALEntryStream { appendToLog("1"); appendToLog("2");// 2 try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) { + new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) { assertEquals("1", getRow(entryStream.next())); appendToLog("3"); // 3 - comes in after reader opened @@ -256,7 +257,7 @@ public class TestWALEntryStream { public void testNewEntriesWhileStreaming() throws Exception { appendToLog("1"); try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) { + new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) { entryStream.next(); // we've hit the end of the stream at this point // some new entries come in while we're streaming @@ -279,7 +280,7 @@ public class TestWALEntryStream { long lastPosition = 0; appendToLog("1"); try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) { + new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) { entryStream.next(); // we've hit the end of the stream at this point appendToLog("2"); appendToLog("3"); @@ -287,7 +288,7 @@ public class TestWALEntryStream { } // next stream should picks up where we left off try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, lastPosition, log, null, new MetricsSource("1"))) { + new WALEntryStream(walQueue, fs, CONF, lastPosition, log, null, new MetricsSource("1"))) { assertEquals("2", getRow(entryStream.next())); assertEquals("3", getRow(entryStream.next())); assertFalse(entryStream.hasNext()); // done @@ -302,16 +303,16 @@ public class TestWALEntryStream { @Test public void testPosition() throws Exception { long lastPosition = 0; - appendEntriesToLog(3); + appendEntriesToLogAndSync(3); // read only one element - try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, lastPosition, + try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, CONF, lastPosition, log, null, new MetricsSource("1"))) { entryStream.next(); lastPosition = entryStream.getPosition(); } // there should still be two more entries from where we left off try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, lastPosition, log, null, new MetricsSource("1"))) { + new WALEntryStream(walQueue, fs, CONF, lastPosition, log, null, new MetricsSource("1"))) { assertNotNull(entryStream.next()); assertNotNull(entryStream.next()); assertFalse(entryStream.hasNext()); @@ -322,38 +323,44 @@ public class TestWALEntryStream { @Test public void testEmptyStream() throws Exception { try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) { + new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) { assertFalse(entryStream.hasNext()); } } + private ReplicationSourceWALReader createReader(boolean recovered, Configuration conf) { + ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class); + when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); + Server mockServer = Mockito.mock(Server.class); + ReplicationSource source = Mockito.mock(ReplicationSource.class); + when(source.getSourceManager()).thenReturn(mockSourceManager); + when(source.getSourceMetrics()).thenReturn(new MetricsSource("1")); + when(source.getWALFileLengthProvider()).thenReturn(log); + when(source.getServer()).thenReturn(mockServer); + when(source.isRecovered()).thenReturn(recovered); + ReplicationSourceWALReader reader = + new ReplicationSourceWALReader(fs, conf, walQueue, 0, getDummyFilter(), source); + reader.start(); + return reader; + } + @Test - public void testReplicationSourceWALReaderThread() throws Exception { - appendEntriesToLog(3); + public void testReplicationSourceWALReader() throws Exception { + appendEntriesToLogAndSync(3); // get ending position long position; try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) { + new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) { entryStream.next(); entryStream.next(); entryStream.next(); position = entryStream.getPosition(); } - // start up a batcher - ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class); - when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); - Server mockServer= Mockito.mock(Server.class); - ReplicationSource source = Mockito.mock(ReplicationSource.class); - when(source.getSourceManager()).thenReturn(mockSourceManager); - when(source.getSourceMetrics()).thenReturn(new MetricsSource("1")); - when(source.getWALFileLengthProvider()).thenReturn(log); - when(source.getServer()).thenReturn(mockServer); - ReplicationSourceWALReader batcher = new ReplicationSourceWALReader(fs, conf, - walQueue, 0, getDummyFilter(), source); + // start up a reader Path walPath = walQueue.peek(); - batcher.start(); - WALEntryBatch entryBatch = batcher.take(); + ReplicationSourceWALReader reader = createReader(false, CONF); + WALEntryBatch entryBatch = reader.take(); // should've batched up our entries assertNotNull(entryBatch); @@ -363,11 +370,96 @@ public class TestWALEntryStream { assertEquals(3, entryBatch.getNbRowKeys()); appendToLog("foo"); - entryBatch = batcher.take(); + entryBatch = reader.take(); assertEquals(1, entryBatch.getNbEntries()); assertEquals("foo", getRow(entryBatch.getWalEntries().get(0))); } + @Test + public void testReplicationSourceWALReaderRecovered() throws Exception { + appendEntriesToLogAndSync(10); + Path walPath = walQueue.peek(); + log.rollWriter(); + appendEntriesToLogAndSync(5); + log.shutdown(); + + Configuration conf = new Configuration(CONF); + conf.setInt("replication.source.nb.capacity", 10); + + ReplicationSourceWALReader reader = createReader(true, conf); + + WALEntryBatch batch = reader.take(); + assertEquals(walPath, batch.getLastWalPath()); + assertEquals(10, batch.getNbEntries()); + assertFalse(batch.isEndOfFile()); + + batch = reader.take(); + assertEquals(walPath, batch.getLastWalPath()); + assertEquals(0, batch.getNbEntries()); + assertTrue(batch.isEndOfFile()); + + walPath = walQueue.peek(); + batch = reader.take(); + assertEquals(walPath, batch.getLastWalPath()); + assertEquals(5, batch.getNbEntries()); + // Actually this should be true but we haven't handled this yet since for a normal queue the + // last one is always open... Not a big deal for now. + assertFalse(batch.isEndOfFile()); + + assertSame(WALEntryBatch.NO_MORE_DATA, reader.take()); + } + + // Testcase for HBASE-20206 + @Test + public void testReplicationSourceWALReaderWrongPosition() throws Exception { + appendEntriesToLogAndSync(1); + Path walPath = walQueue.peek(); + log.rollWriter(); + appendEntriesToLogAndSync(20); + TEST_UTIL.waitFor(5000, new ExplainingPredicate() { + + @Override + public boolean evaluate() throws Exception { + return fs.getFileStatus(walPath).getLen() > 0; + } + + @Override + public String explainFailure() throws Exception { + return walPath + " has not been closed yet"; + } + + }); + long walLength = fs.getFileStatus(walPath).getLen(); + + ReplicationSourceWALReader reader = createReader(false, CONF); + + WALEntryBatch entryBatch = reader.take(); + assertEquals(walPath, entryBatch.getLastWalPath()); + assertTrue("Position " + entryBatch.getLastWalPosition() + " is out of range, file length is " + + walLength, entryBatch.getLastWalPosition() <= walLength); + assertEquals(1, entryBatch.getNbEntries()); + assertTrue(entryBatch.isEndOfFile()); + + Path walPath2 = walQueue.peek(); + entryBatch = reader.take(); + assertEquals(walPath2, entryBatch.getLastWalPath()); + assertEquals(20, entryBatch.getNbEntries()); + assertFalse(entryBatch.isEndOfFile()); + + log.rollWriter(); + appendEntriesToLogAndSync(10); + entryBatch = reader.take(); + assertEquals(walPath2, entryBatch.getLastWalPath()); + assertEquals(0, entryBatch.getNbEntries()); + assertTrue(entryBatch.isEndOfFile()); + + Path walPath3 = walQueue.peek(); + entryBatch = reader.take(); + assertEquals(walPath3, entryBatch.getLastWalPath()); + assertEquals(10, entryBatch.getNbEntries()); + assertFalse(entryBatch.isEndOfFile()); + } + private String getRow(WAL.Entry entry) { Cell cell = entry.getEdit().getCells().get(0); return Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); @@ -380,22 +472,28 @@ public class TestWALEntryStream { log.sync(txid); } - private void appendEntriesToLog(int count) throws IOException { + private void appendEntriesToLogAndSync(int count) throws IOException { + long txid = -1L; for (int i = 0; i < count; i++) { - appendToLog(); + txid = appendToLog(1); } + log.sync(txid); } - private void appendToLog() throws IOException { - appendToLogPlus(1); + private void appendToLogAndSync() throws IOException { + appendToLogAndSync(1); } - private void appendToLogPlus(int count) throws IOException { - final long txid = log.append(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis(), mvcc, scopes), getWALEdits(count), true); + private void appendToLogAndSync(int count) throws IOException { + long txid = appendToLog(count); log.sync(txid); } + private long appendToLog(int count) throws IOException { + return log.append(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, + System.currentTimeMillis(), mvcc, scopes), getWALEdits(count), true); + } + private WALEdit getWALEdits(int count) { WALEdit edit = new WALEdit(); for (int i = 0; i < count; i++) { @@ -439,7 +537,7 @@ public class TestWALEntryStream { appendToLog("2"); long size = log.getLogFileSizeIfBeingWritten(walQueue.peek()).getAsLong(); AtomicLong fileLength = new AtomicLong(size - 1); - try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, 0, + try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, CONF, 0, p -> OptionalLong.of(fileLength.get()), null, new MetricsSource("1"))) { assertTrue(entryStream.hasNext()); assertNotNull(entryStream.next()); -- 2.7.4