From 5a98b56bbd94e656806710c7a6cdc5435865e801 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Mon, 14 Nov 2016 11:02:30 +0800 Subject: [PATCH] HBASE-17085 AsyncFSWAL may issue unnecessary AsyncDFSOutput.sync --- .../hbase/regionserver/wal/AbstractFSWAL.java | 39 ++++++------- .../hadoop/hbase/regionserver/wal/AsyncFSWAL.java | 64 ++++++++++------------ .../hadoop/hbase/regionserver/wal/SyncFuture.java | 21 +++---- .../hbase/regionserver/wal/TestSyncFuture.java | 14 +---- 4 files changed, 56 insertions(+), 82 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index bd3766d..aa57881 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -37,6 +37,7 @@ import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -57,6 +58,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.exceptions.TimeoutIOException; import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CollectionUtils; import org.apache.hadoop.hbase.util.DrainBarrier; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; @@ -163,9 +165,9 @@ public abstract class AbstractFSWAL implements WAL { /** The barrier used to ensure that close() waits for all log rolls and flushes to finish. */ protected final DrainBarrier closeBarrier = new DrainBarrier(); - protected final int slowSyncNs; + protected final long slowSyncNs; - private final long walSyncTimeout; + private final long walSyncTimeoutNs; // If > than this size, roll the log. protected final long logrollsize; @@ -221,12 +223,8 @@ public abstract class AbstractFSWAL implements WAL { * WAL Comparator; it compares the timestamp (log filenum), present in the log file name. Throws * an IllegalArgumentException if used to compare paths from different wals. */ - final Comparator LOG_NAME_COMPARATOR = new Comparator() { - @Override - public int compare(Path o1, Path o2) { - return Long.compare(getFileNumFromFileName(o1), getFileNumFromFileName(o2)); - } - }; + final Comparator LOG_NAME_COMPARATOR = + (o1, o2) -> Long.compare(getFileNumFromFileName(o1), getFileNumFromFileName(o2)); private static final class WalProps { @@ -258,7 +256,7 @@ public abstract class AbstractFSWAL implements WAL { /** * Map of {@link SyncFuture}s keyed by Handler objects. Used so we reuse SyncFutures. *

- * TODO: Reus FSWALEntry's rather than create them anew each time as we do SyncFutures here. + * TODO: Reuse FSWALEntry's rather than create them anew each time as we do SyncFutures here. *

* TODO: Add a FSWalEntry and SyncFuture as thread locals on handlers rather than have them get * them from this Map? @@ -400,10 +398,10 @@ public abstract class AbstractFSWAL implements WAL { LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) + ", rollsize=" + StringUtils.byteDesc(this.logrollsize) + ", prefix=" + this.walFilePrefix + ", suffix=" + walFileSuffix + ", logDir=" + this.walDir + ", archiveDir=" + this.walArchiveDir); - this.slowSyncNs = - 1000000 * conf.getInt("hbase.regionserver.hlog.slowsync.ms", DEFAULT_SLOW_SYNC_TIME_MS); - this.walSyncTimeout = conf.getLong("hbase.regionserver.hlog.sync.timeout", - DEFAULT_WAL_SYNC_TIMEOUT_MS); + this.slowSyncNs = TimeUnit.MILLISECONDS + .toNanos(conf.getInt("hbase.regionserver.hlog.slowsync.ms", DEFAULT_SLOW_SYNC_TIME_MS)); + this.walSyncTimeoutNs = TimeUnit.MILLISECONDS + .toNanos(conf.getLong("hbase.regionserver.hlog.sync.timeout", DEFAULT_WAL_SYNC_TIMEOUT_MS)); int maxHandlersCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, 200); // Presize our map of SyncFutures by handler objects. this.syncFuturesByHandler = new ConcurrentHashMap(maxHandlersCount); @@ -682,7 +680,7 @@ public abstract class AbstractFSWAL implements WAL { protected Span blockOnSync(final SyncFuture syncFuture) throws IOException { // Now we have published the ringbuffer, halt the current thread until we get an answer back. try { - syncFuture.get(walSyncTimeout); + syncFuture.get(walSyncTimeoutNs); return syncFuture.getSpan(); } catch (TimeoutIOException tioe) { // SyncFuture reuse by thread, if TimeoutIOException happens, ringbuffer @@ -840,15 +838,10 @@ public abstract class AbstractFSWAL implements WAL { sequenceIdAccounting.updateStore(encodedRegionName, familyName, sequenceid, onlyIfGreater); } - protected SyncFuture getSyncFuture(final long sequence, Span span) { - SyncFuture syncFuture = this.syncFuturesByHandler.get(Thread.currentThread()); - if (syncFuture == null) { - syncFuture = new SyncFuture(sequence, span); - this.syncFuturesByHandler.put(Thread.currentThread(), syncFuture); - } else { - syncFuture.reset(sequence, span); - } - return syncFuture; + protected SyncFuture getSyncFuture(long sequence, Span span) { + return CollectionUtils + .computeIfAbsent(syncFuturesByHandler, Thread.currentThread(), SyncFuture::new) + .reset(sequence, span); } protected void requestLogRoll(boolean tooFewReplicas) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index 78a3e8a..60b73ab 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -35,8 +35,9 @@ import java.util.Comparator; import java.util.Deque; import java.util.Iterator; import java.util.List; -import java.util.PriorityQueue; import java.util.Queue; +import java.util.SortedSet; +import java.util.TreeSet; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; @@ -109,8 +110,8 @@ import org.apache.htrace.TraceScope; * {@link #readyForRolling} to false, and then wait on {@link #readyForRolling}(see * {@link #waitForSafePoint()}). *

  • In the consumer thread, we will stop polling entries from {@link #waitingConsumePayloads} if - * {@link #waitingRoll} is true, and also stop writing the entries in {@link #toWriteAppends} - * out.
  • + * {@link #waitingRoll} is true, and also stop writing the entries in {@link #toWriteAppends} out. + * *
  • If there are unflush data in the writer, sync them.
  • *
  • When all out-going sync request is finished, i.e, the {@link #unackedAppends} is empty, * signal the {@link #readyForRollingCond}.
  • @@ -179,8 +180,7 @@ public class AsyncFSWAL extends AbstractFSWAL { private final Deque unackedAppends = new ArrayDeque<>(); - private final PriorityQueue syncFutures = - new PriorityQueue(11, SEQ_COMPARATOR); + private final SortedSet syncFutures = new TreeSet(SEQ_COMPARATOR); // the highest txid of WAL entries being processed private long highestProcessedAppendTxid; @@ -188,6 +188,8 @@ public class AsyncFSWAL extends AbstractFSWAL { // file length when we issue last sync request on the writer private long fileLengthAtLastSync; + private long highestProcessedAppendTxidAtLastSync; + public AsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir, Configuration conf, List listeners, boolean failIfWALExists, String prefix, String suffix, EventLoop eventLoop) @@ -321,14 +323,16 @@ public class AsyncFSWAL extends AbstractFSWAL { } } - private void sync(final AsyncWriter writer, final long processedTxid) { + private void sync(AsyncWriter writer) { fileLengthAtLastSync = writer.getLength(); + long currentHighestProcessedAppendTxid = highestProcessedAppendTxid; + highestProcessedAppendTxidAtLastSync = currentHighestProcessedAppendTxid; final long startTimeNs = System.nanoTime(); writer.sync().whenComplete((result, error) -> { if (error != null) { syncFailed(error); } else { - syncCompleted(writer, processedTxid, startTimeNs); + syncCompleted(writer, currentHighestProcessedAppendTxid, startTimeNs); } }); } @@ -341,10 +345,11 @@ public class AsyncFSWAL extends AbstractFSWAL { private int finishSyncLowerThanTxid(long txid, boolean addSyncTrace) { int finished = 0; - for (SyncFuture sync; (sync = syncFutures.peek()) != null;) { + for (Iterator iter = syncFutures.iterator(); iter.hasNext();) { + SyncFuture sync = iter.next(); if (sync.getTxid() <= txid) { sync.done(txid, null); - syncFutures.remove(); + iter.remove(); finished++; if (addSyncTrace) { addTimeAnnotation(sync, "writer synced"); @@ -392,7 +397,7 @@ public class AsyncFSWAL extends AbstractFSWAL { // maybe a sync request is not queued when we issue a sync, so check here to see if we could // finish some. finishSync(false); - long newHighestProcessedTxid = -1L; + long newHighestProcessedAppendTxid = -1L; for (Iterator iter = toWriteAppends.iterator(); iter.hasNext();) { FSWALEntry entry = iter.next(); boolean appended; @@ -415,7 +420,7 @@ public class AsyncFSWAL extends AbstractFSWAL { throw new AssertionError("should not happen", e); } } - newHighestProcessedTxid = entry.getTxid(); + newHighestProcessedAppendTxid = entry.getTxid(); iter.remove(); if (appended) { unackedAppends.addLast(entry); @@ -426,42 +431,32 @@ public class AsyncFSWAL extends AbstractFSWAL { } // if we have a newer transaction id, update it. // otherwise, use the previous transaction id. - if (newHighestProcessedTxid > 0) { - highestProcessedAppendTxid = newHighestProcessedTxid; + if (newHighestProcessedAppendTxid > 0) { + highestProcessedAppendTxid = newHighestProcessedAppendTxid; } else { - newHighestProcessedTxid = highestProcessedAppendTxid; + newHighestProcessedAppendTxid = highestProcessedAppendTxid; } if (writer.getLength() - fileLengthAtLastSync >= batchSize) { // sync because buffer size limit. - sync(writer, newHighestProcessedTxid); + sync(writer); return; } if (writer.getLength() == fileLengthAtLastSync) { // we haven't written anything out, just advance the highestSyncedSequence since we may only // stamped some region sequence id. - highestSyncedTxid.set(newHighestProcessedTxid); - finishSync(false); - trySetReadyForRolling(); + if (unackedAppends.isEmpty()) { + highestSyncedTxid.set(highestProcessedAppendTxid); + finishSync(false); + trySetReadyForRolling(); + } return; } // we have some unsynced data but haven't reached the batch size yet - if (!syncFutures.isEmpty()) { + if (!syncFutures.isEmpty() + && syncFutures.last().getTxid() > highestProcessedAppendTxidAtLastSync) { // we have at least one sync request - sync(writer, newHighestProcessedTxid); - return; - } - // usually waitingRoll is false so we check it without lock first. - if (waitingRoll) { - consumeLock.lock(); - try { - if (waitingRoll) { - // there is a roll request - sync(writer, newHighestProcessedTxid); - } - } finally { - consumeLock.unlock(); - } + sync(writer); } } @@ -474,7 +469,7 @@ public class AsyncFSWAL extends AbstractFSWAL { if (waitingRoll) { if (writer.getLength() > fileLengthAtLastSync) { // issue a sync - sync(writer, highestProcessedAppendTxid); + sync(writer); } else { if (unackedAppends.isEmpty()) { readyForRolling = true; @@ -673,6 +668,7 @@ public class AsyncFSWAL extends AbstractFSWAL { this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput(); } this.fileLengthAtLastSync = 0L; + this.highestProcessedAppendTxidAtLastSync = 0L; consumeLock.lock(); try { consumerScheduled.set(true); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java index 6e302a3..bc2e62e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java @@ -18,10 +18,10 @@ package org.apache.hadoop.hbase.regionserver.wal; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.exceptions.TimeoutIOException; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.htrace.Span; /** @@ -73,13 +73,6 @@ class SyncFuture { */ private Span span; - SyncFuture(long txid, Span span) { - this.t = Thread.currentThread(); - this.txid = txid; - this.span = span; - this.doneTxid = NOT_DONE; - } - /** * Call this method to clear old usage and get it ready for new deploy. * @param txid the new transaction id @@ -157,15 +150,15 @@ class SyncFuture { throw new UnsupportedOperationException(); } - synchronized long get(long timeout) throws InterruptedException, + synchronized long get(long timeoutNs) throws InterruptedException, ExecutionException, TimeoutIOException { - final long done = EnvironmentEdgeManager.currentTime() + timeout; + final long done = System.nanoTime() + timeoutNs; while (!isDone()) { wait(1000); - if (EnvironmentEdgeManager.currentTime() >= done) { - throw new TimeoutIOException("Failed to get sync result after " - + timeout + " ms for txid=" + this.txid - + ", WAL system stuck?"); + if (System.nanoTime() >= done) { + throw new TimeoutIOException( + "Failed to get sync result after " + TimeUnit.NANOSECONDS.toMillis(timeoutNs) + + " ms for txid=" + this.txid + ", WAL system stuck?"); } } if (this.throwable != null) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSyncFuture.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSyncFuture.java index 2cba040..50825f8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSyncFuture.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSyncFuture.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.regionserver.wal; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; import org.apache.hadoop.hbase.exceptions.TimeoutIOException; import org.apache.hadoop.hbase.testclassification.RegionServerTests; @@ -29,21 +28,14 @@ import org.junit.experimental.categories.Category; @Category({ RegionServerTests.class, SmallTests.class }) public class TestSyncFuture { - @Test(timeout = 60000) + @Test(expected = TimeoutIOException.class) public void testGet() throws Exception { long timeout = 5000; long txid = 100000; - SyncFuture syncFulture = new SyncFuture(txid, null); + SyncFuture syncFulture = new SyncFuture().reset(txid, null); syncFulture.done(txid, null); assertEquals(txid, syncFulture.get(timeout)); - syncFulture.reset(txid, null); - try { - syncFulture.get(timeout); - fail("Should have timed out but not"); - } catch (TimeoutIOException e) { - // test passed - } + syncFulture.reset(txid, null).get(timeout); } - } -- 2.7.4