diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 3e0e829..142ab63 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -30,15 +30,6 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import com.google.common.annotations.VisibleForTesting; -import com.lmax.disruptor.BlockingWaitStrategy; -import com.lmax.disruptor.EventHandler; -import com.lmax.disruptor.ExceptionHandler; -import com.lmax.disruptor.LifecycleAware; -import com.lmax.disruptor.TimeoutException; -import com.lmax.disruptor.dsl.Disruptor; -import com.lmax.disruptor.dsl.ProducerType; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -68,6 +59,15 @@ import org.apache.htrace.Span; import org.apache.htrace.Trace; import org.apache.htrace.TraceScope; +import com.google.common.annotations.VisibleForTesting; +import com.lmax.disruptor.BlockingWaitStrategy; +import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.ExceptionHandler; +import com.lmax.disruptor.LifecycleAware; +import com.lmax.disruptor.TimeoutException; +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.dsl.ProducerType; + /** * The default implementation of FSWAL. */ @@ -499,6 +499,7 @@ public class FSHLog extends AbstractFSWAL { private volatile long sequence; // Keep around last exception thrown. Clear on successful sync. private final BlockingQueue syncFutures; + private volatile SyncFuture takeSyncFuture = null; /** * UPDATE! @@ -546,6 +547,7 @@ public class FSHLog extends AbstractFSWAL { if (!syncFuture.done(currentSequence, t)) { throw new IllegalStateException(); } + // This function releases one sync future only. return 1; } @@ -589,13 +591,21 @@ public class FSHLog extends AbstractFSWAL { return sequence; } + boolean areSyncFuturesReleased() { + // check whether there is no sync futures offered, and no in-flight sync futures that is being + // processed. + return syncFutures.size() <= 0 + && takeSyncFuture == null; + } + public void run() { long currentSequence; while (!isInterrupted()) { int syncCount = 0; - SyncFuture takeSyncFuture; + try { while (true) { + takeSyncFuture = null; // We have to process what we 'take' from the queue takeSyncFuture = this.syncFutures.take(); currentSequence = this.sequence; @@ -975,11 +985,23 @@ public class FSHLog extends AbstractFSWAL { * @return True if outstanding sync futures still */ private boolean isOutstandingSyncs() { + // Look at SyncFutures in the EventHandler for (int i = 0; i < this.syncFuturesCount; i++) { if (!this.syncFutures[i].isDone()) { return true; } } + + return false; + } + + private boolean isOutstandingSyncsFromRunners() { + // Look at SyncFutures in the SyncRunners + for (SyncRunner syncRunner: syncRunners) { + if(syncRunner.isAlive() && !syncRunner.areSyncFuturesReleased()) { + return true; + } + } return false; } @@ -1095,11 +1117,13 @@ public class FSHLog extends AbstractFSWAL { // Wait on outstanding syncers; wait for them to finish syncing (unless we've been // shutdown or unless our latch has been thrown because we have been aborted or unless // this WAL is broken and we can't get a sync/append to complete). - while (!this.shutdown && this.zigzagLatch.isCocked() + while ((!this.shutdown && this.zigzagLatch.isCocked() && highestSyncedTxid.get() < currentSequence && // We could be in here and all syncs are failing or failed. Check for this. Otherwise // we'll just be stuck here for ever. In other words, ensure there syncs running. - isOutstandingSyncs()) { + isOutstandingSyncs()) + // Wait for all SyncRunners to finish their work so that we can replace the writer + || isOutstandingSyncsFromRunners()) { synchronized (this.safePointWaiter) { this.safePointWaiter.wait(0, 1); } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java index eda7df7..7412128 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java @@ -18,11 +18,10 @@ package org.apache.hadoop.hbase.regionserver.wal; import static org.junit.Assert.assertFalse; - import java.io.IOException; import java.util.NavigableMap; import java.util.TreeMap; - +import java.util.concurrent.ThreadLocalRandom; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -39,6 +38,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKey; @@ -56,7 +56,18 @@ public class TestLogRollingNoCluster { withLookingForStuckThread(true).build(); private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private final static byte [] EMPTY_1K_ARRAY = new byte[1024]; - private static final int THREAD_COUNT = 100; // Spin up this many threads + private static final int NUM_THREADS = 100; // Spin up this many threads + private static final int NUM_ENTRIES = 100; // How many entries to write + + /** ProtobufLogWriter that simulates higher latencies in sync() call */ + public static class HighLatencySyncWriter extends ProtobufLogWriter { + @Override + public void sync() throws IOException { + Threads.sleep(ThreadLocalRandom.current().nextInt(10)); + super.sync(); + Threads.sleep(ThreadLocalRandom.current().nextInt(10)); + } + } /** * Spin up a bunch of threads and have them all append to a WAL. Roll the @@ -65,38 +76,42 @@ public class TestLogRollingNoCluster { * @throws InterruptedException */ @Test - public void testContendedLogRolling() throws IOException, InterruptedException { - Path dir = TEST_UTIL.getDataTestDir(); + public void testContendedLogRolling() throws Exception { + TEST_UTIL.startMiniDFSCluster(3); + Path dir = TEST_UTIL.getDataTestDirOnTestFS(); + // The implementation needs to know the 'handler' count. - TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, THREAD_COUNT); + TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, NUM_THREADS); final Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); conf.set(WALFactory.WAL_PROVIDER, "filesystem"); FSUtils.setRootDir(conf, dir); + conf.set("hbase.regionserver.hlog.writer.impl", HighLatencySyncWriter.class.getName()); final WALFactory wals = new WALFactory(conf, null, TestLogRollingNoCluster.class.getName()); final WAL wal = wals.getWAL(new byte[]{}, null); Appender [] appenders = null; - final int count = THREAD_COUNT; - appenders = new Appender[count]; + final int numThreads = NUM_THREADS; + appenders = new Appender[numThreads]; try { - for (int i = 0; i < count; i++) { + for (int i = 0; i < numThreads; i++) { // Have each appending thread write 'count' entries - appenders[i] = new Appender(wal, i, count); + appenders[i] = new Appender(wal, i, NUM_ENTRIES); } - for (int i = 0; i < count; i++) { + for (int i = 0; i < numThreads; i++) { appenders[i].start(); } - for (int i = 0; i < count; i++) { + for (int i = 0; i < numThreads; i++) { //ensure that all threads are joined before closing the wal appenders[i].join(); } } finally { wals.close(); } - for (int i = 0; i < count; i++) { + for (int i = 0; i < numThreads; i++) { assertFalse(appenders[i].isException()); } + TEST_UTIL.shutdownMiniDFSCluster(); } /** @@ -149,6 +164,7 @@ public class TestLogRollingNoCluster { } final long txid = wal.append(hri, new WALKey(hri.getEncodedNameAsBytes(), TableName.META_TABLE_NAME, now, mvcc, scopes), edit, true); + Threads.sleep(ThreadLocalRandom.current().nextInt(5)); wal.sync(txid); } String msg = getName() + " finished";