diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index c11496b8f3..c6b1c03cca 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -29,11 +29,15 @@ import java.lang.reflect.Field; import java.util.ArrayDeque; import java.util.Comparator; import java.util.Deque; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.OptionalLong; import java.util.Queue; import java.util.SortedSet; import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; @@ -181,6 +185,7 @@ public class AsyncFSWAL extends AbstractFSWAL { private final long batchSize; + private final Map inflightWALClosures = new ConcurrentHashMap<>(); private final ExecutorService closeExecutor = Executors.newCachedThreadPool( new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build()); @@ -331,6 +336,30 @@ public class AsyncFSWAL extends AbstractFSWAL { } } + @Override + public OptionalLong getLogFileSizeIfBeingWritten(Path path) { + rollWriterLock.lock(); + try { + Path currentPath = getOldPath(); + if (path.equals(currentPath)) { + // Currently active path. + AsyncWriter writer = this.writer; + return writer != null ? OptionalLong.of(writer.getSyncedLength()) : OptionalLong.empty(); + } else { + AsyncWriter temp = inflightWALClosures.get(path.getName()); + if (temp != null) { + // Trailer is still being written + return OptionalLong.of(temp.getSyncedLength()); + } + // Log rolled successfully. + return OptionalLong.empty(); + } + } finally { + rollWriterLock.unlock(); + } + + } + private void syncCompleted(AsyncWriter writer, long processedTxid, long startTimeNs) { highestSyncedTxid.set(processedTxid); for (Iterator iter = unackedAppends.iterator(); iter.hasNext();) { @@ -712,14 +741,17 @@ public class AsyncFSWAL extends AbstractFSWAL { } } - protected final long closeWriter(AsyncWriter writer) { + protected final long closeWriter(AsyncWriter writer, Path path) { if (writer != null) { + inflightWALClosures.put(path.getName(), writer); long fileLength = writer.getLength(); closeExecutor.execute(() -> { try { writer.close(); } catch (IOException e) { LOG.warn("close old writer failed", e); + } finally { + inflightWALClosures.remove(path.getName()); } }); return fileLength; @@ -733,7 +765,7 @@ public class AsyncFSWAL extends AbstractFSWAL { throws IOException { Preconditions.checkNotNull(nextWriter); waitForSafePoint(); - long oldFileLen = closeWriter(this.writer); + long oldFileLen = closeWriter(this.writer, oldPath); logRollAndSetupWalProps(oldPath, newPath, oldFileLen); this.writer = nextWriter; if (nextWriter instanceof AsyncProtobufLogWriter) { @@ -759,7 +791,7 @@ public class AsyncFSWAL extends AbstractFSWAL { @Override protected void doShutdown() throws IOException { waitForSafePoint(); - closeWriter(this.writer); + closeWriter(this.writer, getOldPath()); this.writer = null; closeExecutor.shutdown(); try { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java index 5507972ab4..ad9685ba33 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java @@ -54,6 +54,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfoBuilder; @@ -876,7 +877,8 @@ public class TestWALEntryStream { assertNotNull(entryStream.next()); log.rollWriter(); appendToLogAndSync(); - assertNotNull(entryStream.next()); + // Wait for the previous log to off of the queue, hence the wait.. + Waiter.waitFor(CONF, 5000, (Waiter.Predicate) () -> entryStream.next() != null); assertEquals(0, logQueue.getMetrics().getUncleanlyClosedWALs()); } }