From 7aa3da6ec2e70b134e730dffb41109555c4dbd5d Mon Sep 17 00:00:00 2001 From: zhangduo Date: Fri, 16 Mar 2018 17:41:31 +0800 Subject: [PATCH] HBASE-20206 WALEntryStream should not switch WAL file silently --- .../regionserver/RecoveredReplicationSource.java | 33 ---------- .../regionserver/ReplicationSource.java | 2 +- .../regionserver/ReplicationSourceShipper.java | 50 ++++++++------- .../regionserver/ReplicationSourceWALReader.java | 48 +++++++++++--- .../SerialReplicationSourceWALReader.java | 29 ++++++++- .../replication/regionserver/WALEntryBatch.java | 17 +++++ .../replication/regionserver/WALEntryStream.java | 2 +- .../TestReplicationEmptyWALRecovery.java | 2 +- .../regionserver/TestWALEntryStream.java | 73 +++++++++++++++++----- 9 files changed, 171 insertions(+), 85 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java index 169b469..f1ad99d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; import java.util.List; import java.util.UUID; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.PriorityBlockingQueue; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -64,38 +63,6 @@ public class RecoveredReplicationSource extends ReplicationSource { return new RecoveredReplicationSourceShipper(conf, walGroupId, queue, this, queueStorage); } - private void handleEmptyWALEntryBatch0(ReplicationSourceWALReader reader, - BlockingQueue entryBatchQueue, Path currentPath) throws InterruptedException { - LOG.trace("Didn't read any new entries from WAL"); - // we're done with queue recovery, shut ourself down - reader.setReaderRunning(false); - // shuts down shipper thread immediately - entryBatchQueue.put(new WALEntryBatch(0, currentPath)); - } - - @Override - protected ReplicationSourceWALReader createNewWALReader(String walGroupId, - PriorityBlockingQueue queue, long startPosition) { - if (replicationPeer.getPeerConfig().isSerial()) { - return new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, - this) { - - @Override - protected void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException { - handleEmptyWALEntryBatch0(this, entryBatchQueue, currentPath); - } - }; - } else { - return new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this) { - - @Override - protected void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException { - handleEmptyWALEntryBatch0(this, entryBatchQueue, currentPath); - } - }; - } - } - public void locateRecoveredPaths(PriorityBlockingQueue queue) throws IOException { boolean hasPathChanged = false; PriorityBlockingQueue newPaths = diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 3480919..236c575 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -315,7 +315,7 @@ public class ReplicationSource implements ReplicationSourceInterface { return new ReplicationSourceShipper(conf, walGroupId, queue, this); } - protected ReplicationSourceWALReader createNewWALReader(String walGroupId, + private ReplicationSourceWALReader createNewWALReader(String walGroupId, PriorityBlockingQueue queue, long startPosition) { return replicationPeer.getPeerConfig().isSerial() ? new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index aa5251e..03b0236 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; import java.util.List; -import java.util.Map; import java.util.concurrent.PriorityBlockingQueue; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -52,15 +51,15 @@ public class ReplicationSourceShipper extends Thread { FINISHED, // The worker is done processing a recovered queue } - protected final Configuration conf; + private final Configuration conf; protected final String walGroupId; protected final PriorityBlockingQueue queue; - protected final ReplicationSourceInterface source; + private final ReplicationSourceInterface source; // Last position in the log that we sent to ZooKeeper - protected long lastLoggedPosition = -1; + private long currentPosition = -1; // Path of the current log - protected volatile Path currentPath; + private Path currentPath; // Current state of the worker thread private WorkerState state; protected ReplicationSourceWALReader entryReader; @@ -125,12 +124,9 @@ public class ReplicationSourceShipper extends Thread { */ protected final void shipEdits(WALEntryBatch entryBatch) { List entries = entryBatch.getWalEntries(); - long lastReadPosition = entryBatch.getLastWalPosition(); - currentPath = entryBatch.getLastWalPath(); int sleepMultiplier = 0; if (entries.isEmpty()) { - if (lastLoggedPosition != lastReadPosition) { - updateLogPosition(lastReadPosition, entryBatch.getLastSeqIds()); + if (updateLogPosition(entryBatch)) { // if there was nothing to ship and it's not an error // set "ageOfLastShippedOp" to to indicate that we're current source.getSourceMetrics().setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), @@ -168,16 +164,12 @@ public class ReplicationSourceShipper extends Thread { } else { sleepMultiplier = Math.max(sleepMultiplier - 1, 0); } - - if (this.lastLoggedPosition != lastReadPosition) { - // Clean up hfile references - int size = entries.size(); - for (int i = 0; i < size; i++) { - cleanUpHFileRefs(entries.get(i).getEdit()); - } - // Log and clean up WAL logs - updateLogPosition(lastReadPosition, entryBatch.getLastSeqIds()); + // Clean up hfile references + for (Entry entry : entries) { + cleanUpHFileRefs(entry.getEdit()); } + // Log and clean up WAL logs + updateLogPosition(entryBatch); source.postShipEdits(entries, currentSize); // FIXME check relationship between wal group and overall @@ -224,10 +216,20 @@ public class ReplicationSourceShipper extends Thread { } } - private void updateLogPosition(long lastReadPosition, Map lastSeqIds) { - source.getSourceManager().logPositionAndCleanOldLogs(currentPath, source.getQueueId(), - lastReadPosition, lastSeqIds, source.isRecovered()); - lastLoggedPosition = lastReadPosition; + private boolean updateLogPosition(WALEntryBatch batch) { + boolean updated = false; + // if end of file is true, then the logPositionAndCleanOldLogs method will remove the file + // record on zk, so let's call it. + if (batch.isEndOfFile() || !batch.getLastWalPath().equals(currentPath) || + batch.getLastWalPosition() != currentPosition) { + source.getSourceManager().logPositionAndCleanOldLogs(batch.getLastWalPath(), + source.getQueueId(), batch.getLastWalPosition(), batch.getLastSeqIds(), + source.isRecovered()); + updated = true; + } + currentPath = batch.getLastWalPath(); + currentPosition = batch.getLastWalPosition(); + return updated; } public void startup(UncaughtExceptionHandler handler) { @@ -241,11 +243,11 @@ public class ReplicationSourceShipper extends Thread { } public Path getCurrentPath() { - return this.entryReader.getCurrentPath(); + return entryReader.getCurrentPath(); } public long getCurrentPosition() { - return this.lastLoggedPosition; + return currentPosition; } public void setWALReader(ReplicationSourceWALReader entryReader) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index b125133..7b3311b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -130,6 +130,7 @@ class ReplicationSourceWALReader extends Thread { continue; } WALEntryBatch batch = readWALEntries(entryStream); + currentPosition = entryStream.getPosition(); if (batch != null) { // need to propagate the batch even it has no entries since it may carry the last // sequence id information for serial replication. @@ -138,9 +139,8 @@ class ReplicationSourceWALReader extends Thread { sleepMultiplier = 1; } else { // got no entries and didn't advance position in WAL handleEmptyWALEntryBatch(entryStream.getCurrentPath()); + entryStream.reset(); // reuse stream } - currentPosition = entryStream.getPosition(); - entryStream.reset(); // reuse stream } } catch (IOException e) { // stream related if (sleepMultiplier < maxRetriesMultiplier) { @@ -173,13 +173,31 @@ class ReplicationSourceWALReader extends Thread { batch.getNbEntries() >= replicationBatchCountCapacity; } + protected static final boolean switched(WALEntryStream entryStream, Path path) { + return !entryStream.getCurrentPath().equals(path); + } + protected WALEntryBatch readWALEntries(WALEntryStream entryStream) throws IOException, InterruptedException { + Path currentPath = entryStream.getCurrentPath(); if (!entryStream.hasNext()) { - return null; + // check whether we have switched a file + if (currentPath != null && switched(entryStream, currentPath)) { + return WALEntryBatch.endOfFile(currentPath); + } else { + return null; + } + } + if (currentPath != null) { + if (switched(entryStream, currentPath)) { + return WALEntryBatch.endOfFile(currentPath); + } + } else { + // when reading from the entry stream first time we will enter here + currentPath = entryStream.getCurrentPath(); } WALEntryBatch batch = createBatch(entryStream); - do { + for (;;) { Entry entry = entryStream.next(); batch.setLastWalPosition(entryStream.getPosition()); entry = filterEntry(entry); @@ -188,13 +206,29 @@ class ReplicationSourceWALReader extends Thread { break; } } - } while (entryStream.hasNext()); + boolean hasNext = entryStream.hasNext(); + // always return if we have switched to a new file + if (switched(entryStream, currentPath)) { + batch.setEndOfFile(true); + break; + } + if (!hasNext) { + break; + } + } return batch; } - protected void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException { + private void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException { LOG.trace("Didn't read any new entries from WAL"); - Thread.sleep(sleepForRetries); + if (source.isRecovered()) { + // we're done with queue recovery, shut ourself down + setReaderRunning(false); + // shuts down shipper thread immediately + entryBatchQueue.put(WALEntryBatch.endOfFile(currentPath)); + } else { + Thread.sleep(sleepForRetries); + } } // if we get an EOF due to a zero-length log, and there are other logs in queue diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java index 5e9a9f6..9edcc8a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java @@ -53,12 +53,26 @@ public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader @Override protected WALEntryBatch readWALEntries(WALEntryStream entryStream) throws IOException, InterruptedException { + Path currentPath = entryStream.getCurrentPath(); if (!entryStream.hasNext()) { - return null; + // check whether we have switched a file + if (currentPath != null && switched(entryStream, currentPath)) { + return WALEntryBatch.endOfFile(currentPath); + } else { + return null; + } + } + if (currentPath != null) { + if (switched(entryStream, currentPath)) { + return WALEntryBatch.endOfFile(currentPath); + } + } else { + // when reading from the entry stream first time we will enter here + currentPath = entryStream.getCurrentPath(); } long positionBefore = entryStream.getPosition(); WALEntryBatch batch = createBatch(entryStream); - do { + for (;;) { Entry entry = entryStream.peek(); boolean doFiltering = true; if (firstCellInEntryBeforeFiltering == null) { @@ -99,7 +113,16 @@ public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader // actually remove the entry. removeEntryFromStream(entryStream, batch); } - } while (entryStream.hasNext()); + boolean hasNext = entryStream.hasNext(); + // always return if we have switched to a new file. + if (switched(entryStream, currentPath)) { + batch.setEndOfFile(true); + break; + } + if (!hasNext) { + break; + } + } return batch; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java index 31c3ac7..cbb563a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java @@ -43,6 +43,8 @@ class WALEntryBatch { private long heapSize = 0; // save the last sequenceid for each region if the table has serial-replication scope private Map lastSeqIds = new HashMap<>(); + // indicate that this is the end of the current file + private boolean endOfFile; /** * @param lastWalPath Path of the WAL the last entry in this batch was read from @@ -52,6 +54,13 @@ class WALEntryBatch { this.lastWalPath = lastWalPath; } + + static WALEntryBatch endOfFile(Path lastWalPath) { + WALEntryBatch batch = new WALEntryBatch(0, lastWalPath); + batch.setEndOfFile(true); + return batch; + } + public void addEntry(Entry entry) { walEntries.add(entry); } @@ -120,6 +129,14 @@ class WALEntryBatch { return lastSeqIds; } + public boolean isEndOfFile() { + return endOfFile; + } + + public void setEndOfFile(boolean endOfFile) { + this.endOfFile = endOfFile; + } + public void incrementNbRowKeys(int increment) { nbRowKeys += increment; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java index c639a48..4e0e720 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java @@ -155,7 +155,6 @@ class WALEntryStream implements Closeable { /** * Should be called if the stream is to be reused (i.e. used again after hasNext() has returned * false) - * @throws IOException */ public void reset() throws IOException { if (reader != null && currentPath != null) { @@ -394,6 +393,7 @@ class WALEntryStream implements Closeable { private void resetReader() throws IOException { try { + currentEntry = null; reader.reset(); seek(); } catch (FileNotFoundException fnfe) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java index 4effe41..0f67990 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java @@ -55,7 +55,7 @@ public class TestReplicationEmptyWALRecovery extends TestReplicationBase { * @param numRs number of regionservers */ private void waitForLogAdvance(int numRs) throws Exception { - Waiter.waitFor(conf1, 10000, new Waiter.Predicate() { + Waiter.waitFor(conf1, 1000000, new Waiter.Predicate() { @Override public boolean evaluate() throws Exception { for (int i = 0; i < numRs; i++) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java index eb7d5a0..5073ce4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java @@ -327,8 +327,23 @@ public class TestWALEntryStream { } } + private ReplicationSourceWALReader createReader() { + ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class); + when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); + Server mockServer = Mockito.mock(Server.class); + ReplicationSource source = Mockito.mock(ReplicationSource.class); + when(source.getSourceManager()).thenReturn(mockSourceManager); + when(source.getSourceMetrics()).thenReturn(new MetricsSource("1")); + when(source.getWALFileLengthProvider()).thenReturn(log); + when(source.getServer()).thenReturn(mockServer); + ReplicationSourceWALReader reader = + new ReplicationSourceWALReader(fs, conf, walQueue, 0, getDummyFilter(), source); + reader.start(); + return reader; + } + @Test - public void testReplicationSourceWALReaderThread() throws Exception { + public void testReplicationSourceWALReader() throws Exception { appendEntriesToLog(3); // get ending position long position; @@ -340,20 +355,10 @@ public class TestWALEntryStream { position = entryStream.getPosition(); } - // start up a batcher - ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class); - when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); - Server mockServer= Mockito.mock(Server.class); - ReplicationSource source = Mockito.mock(ReplicationSource.class); - when(source.getSourceManager()).thenReturn(mockSourceManager); - when(source.getSourceMetrics()).thenReturn(new MetricsSource("1")); - when(source.getWALFileLengthProvider()).thenReturn(log); - when(source.getServer()).thenReturn(mockServer); - ReplicationSourceWALReader batcher = new ReplicationSourceWALReader(fs, conf, - walQueue, 0, getDummyFilter(), source); + // start up a reader Path walPath = walQueue.peek(); - batcher.start(); - WALEntryBatch entryBatch = batcher.take(); + ReplicationSourceWALReader reader = createReader(); + WALEntryBatch entryBatch = reader.take(); // should've batched up our entries assertNotNull(entryBatch); @@ -363,11 +368,49 @@ public class TestWALEntryStream { assertEquals(3, entryBatch.getNbRowKeys()); appendToLog("foo"); - entryBatch = batcher.take(); + entryBatch = reader.take(); assertEquals(1, entryBatch.getNbEntries()); assertEquals("foo", getRow(entryBatch.getWalEntries().get(0))); } + // Testcase for HBASE-20206 + @Test + public void testReplicationSourceWALReaderWrongPosition() throws Exception { + appendEntriesToLog(1); + Path walPath = walQueue.peek(); + log.rollWriter(); + appendEntriesToLog(20); + long walLength = fs.getFileStatus(walPath).getLen(); + + ReplicationSourceWALReader reader = createReader(); + + WALEntryBatch entryBatch = reader.take(); + assertEquals(walPath, entryBatch.getLastWalPath()); + assertTrue("Position " + entryBatch.getLastWalPosition() + " is out of range, file length is " + + walLength, entryBatch.getLastWalPosition() <= walLength); + assertEquals(1, entryBatch.getNbEntries()); + assertTrue(entryBatch.isEndOfFile()); + + walPath = walQueue.peek(); + entryBatch = reader.take(); + assertEquals(walPath, entryBatch.getLastWalPath()); + assertEquals(20, entryBatch.getNbEntries()); + assertFalse(entryBatch.isEndOfFile()); + + log.rollWriter(); + appendEntriesToLog(10); + entryBatch = reader.take(); + assertEquals(walPath, entryBatch.getLastWalPath()); + assertEquals(0, entryBatch.getNbEntries()); + assertTrue(entryBatch.isEndOfFile()); + + walPath = walQueue.peek(); + entryBatch = reader.take(); + assertEquals(walPath, entryBatch.getLastWalPath()); + assertEquals(10, entryBatch.getNbEntries()); + assertFalse(entryBatch.isEndOfFile()); + } + private String getRow(WAL.Entry entry) { Cell cell = entry.getEdit().getCells().get(0); return Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); -- 2.7.4