diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index a8b0372..dd665eb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
@@ -164,6 +165,8 @@ public class FSHLog implements WAL {
private static final int DEFAULT_SLOW_SYNC_TIME_MS = 100; // in ms
+ private static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min
+
/**
* The nexus at which all incoming handlers meet. Does appends and sync with an ordering.
* Appends and syncs are each put on the ring which means handlers need to
@@ -281,6 +284,8 @@ public class FSHLog implements WAL {
private final int slowSyncNs;
+ private final long walSyncTimeout;
+
// If live datanode count is lower than the default replicas value,
// RollWriter will be triggered in each sync(So the RollWriter will be
// triggered one by one in a short time). Using it as a workaround to slow
@@ -535,6 +540,8 @@ public class FSHLog implements WAL {
this.slowSyncNs =
1000000 * conf.getInt("hbase.regionserver.hlog.slowsync.ms",
DEFAULT_SLOW_SYNC_TIME_MS);
+ this.walSyncTimeout = conf.getLong("hbase.regionserver.hlog.sync.timeout",
+ DEFAULT_WAL_SYNC_TIMEOUT_MS);
// This is the 'writer' -- a single threaded executor. This single thread 'consumes' what is
// put on the ring buffer.
@@ -1400,8 +1407,14 @@ public class FSHLog implements WAL {
private Span blockOnSync(final SyncFuture syncFuture) throws IOException {
// Now we have published the ringbuffer, halt the current thread until we get an answer back.
try {
- syncFuture.get();
+ syncFuture.get(walSyncTimeout);
return syncFuture.getSpan();
+ } catch (TimeoutIOException tioe) {
+ // SyncFuture reuse by thread, if TimeoutIOException happens, ringbuffer
+ // still refer to it, so if this thread use it next time may get a wrong
+ // result.
+ this.syncFuturesByHandler.remove(Thread.currentThread());
+ throw tioe;
} catch (InterruptedException ie) {
LOG.warn("Interrupted", ie);
throw convertInterruptedExceptionToIOException(ie);
@@ -1806,6 +1819,12 @@ public class FSHLog implements WAL {
} catch (Exception e) {
// Failed append. Record the exception.
this.exception = e;
+ // invoking cleanupOutstandingSyncsOnException when append failed with exception,
+ // it will cleanup existing sync requests recorded in syncFutures but not offered to SyncRunner yet,
+ // so there won't be any sync future left over if no further truck published to disruptor.
+ cleanupOutstandingSyncsOnException(sequence,
+ this.exception instanceof DamagedWALException ? this.exception
+ : new DamagedWALException("On sync", this.exception));
// Return to keep processing events coming off the ringbuffer
return;
} finally {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java
index 7de8367..0720de8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java
@@ -21,6 +21,8 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.htrace.Span;
/**
@@ -105,6 +107,7 @@ class SyncFuture {
this.doneSequence = NOT_DONE;
this.ringBufferSequence = sequence;
this.span = span;
+ this.throwable = null;
return this;
}
@@ -162,9 +165,16 @@ class SyncFuture {
throw new UnsupportedOperationException();
}
- public synchronized long get() throws InterruptedException, ExecutionException {
+ public synchronized long get(long timeout) throws InterruptedException,
+ ExecutionException, TimeoutIOException {
+ final long done = EnvironmentEdgeManager.currentTime() + timeout;
while (!isDone()) {
wait(1000);
+ if (EnvironmentEdgeManager.currentTime() >= done) {
+ throw new TimeoutIOException("Failed to get sync result after "
+ + timeout + " ms for ringBufferSequence=" + this.ringBufferSequence
+ + ", WAL system stuck?");
+ }
}
if (this.throwable != null) throw new ExecutionException(this.throwable);
return this.doneSequence;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
index 56c6c60..d82d1df 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
@@ -18,7 +18,6 @@
*/
package org.apache.hadoop.hbase.regionserver;
-
import static org.junit.Assert.assertTrue;
import java.io.IOException;
@@ -31,14 +30,21 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.ChoreService;
+import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.regionserver.wal.DamagedWALException;
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
+import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -47,6 +53,8 @@ import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALProvider.Writer;
+import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
@@ -282,6 +290,281 @@ public class TestWALLockup {
}
/**
+ * Reproduce locking up that happens when there's no further syncs after
+ * append fails, and causing an isolated sync then infinite wait. See
+ * HBASE-16960. If below is broken, we will see this test timeout because it
+ * is locked up.
+ *
+ * Steps for reproduce:
+ * 1. Trigger server abort through dodgyWAL1
+ * 2. Add a {@link DummyWALActionsListener} to dodgyWAL2 to cause ringbuffer
+ * event handler thread sleep for a while thus keeping {@code endOfBatch}
+ * false
+ * 3. Publish a sync then an append which will throw exception, check whether
+ * the sync could return
+ */
+ @Test(timeout = 20000)
+ public void testLockup16960() throws IOException {
+ // A WAL that we can have throw exceptions when a flag is set.
+ class DodgyFSLog extends FSHLog {
+ // Set this when want the WAL to start throwing exceptions.
+ volatile boolean throwException = false;
+
+ public DodgyFSLog(FileSystem fs, Path root, String logDir,
+ Configuration conf) throws IOException {
+ super(fs, root, logDir, conf);
+ }
+
+ @Override
+ protected Writer createWriterInstance(Path path) throws IOException {
+ final Writer w = super.createWriterInstance(path);
+ return new Writer() {
+ @Override
+ public void close() throws IOException {
+ w.close();
+ }
+
+ @Override
+ public void sync() throws IOException {
+ if (throwException) {
+ throw new IOException(
+ "FAKE! Failed to replace a bad datanode...SYNC");
+ }
+ w.sync();
+ }
+
+ @Override
+ public void append(Entry entry) throws IOException {
+ if (throwException) {
+ throw new IOException(
+ "FAKE! Failed to replace a bad datanode...APPEND");
+ }
+ w.append(entry);
+ }
+
+ @Override
+ public long getLength() throws IOException {
+ return w.getLength();
+ }
+ };
+ }
+
+ @Override
+ public byte[][] rollWriter(boolean force) throws FailedLogCloseException,
+ IOException {
+ if (throwException) {
+ throw new FailedLogCloseException("testLockup16960");
+ }
+ return super.rollWriter(force);
+ }
+ }
+
+ // Mocked up server and regionserver services. Needed below.
+ Server server = new DummyServer(CONF, ServerName.valueOf(
+ "hostname1.example.org", 1234, 1L).toString());
+ RegionServerServices services = Mockito.mock(RegionServerServices.class);
+
+ CONF.setLong("hbase.regionserver.hlog.sync.timeout", 10000);
+
+ // OK. Now I have my mocked up Server & RegionServerServices and dodgy WAL,
+ // go ahead with test.
+ FileSystem fs = FileSystem.get(CONF);
+ Path rootDir = new Path(dir + getName());
+ DodgyFSLog dodgyWAL1 = new DodgyFSLog(fs, rootDir, getName(), CONF);
+
+ Path rootDir2 = new Path(dir + getName() + "2");
+ final DodgyFSLog dodgyWAL2 = new DodgyFSLog(fs, rootDir2, getName() + "2",
+ CONF);
+ // Add a listener to force ringbuffer event handler sleep for a while
+ dodgyWAL2.registerWALActionsListener(new DummyWALActionsListener());
+
+ // I need a log roller running.
+ LogRoller logRoller = new LogRoller(server, services);
+ logRoller.addWAL(dodgyWAL1);
+ logRoller.addWAL(dodgyWAL2);
+ // There is no 'stop' once a logRoller is running.. it just dies.
+ logRoller.start();
+ // Now get a region and start adding in edits.
+ HTableDescriptor htd = new HTableDescriptor(TableName.META_TABLE_NAME);
+ final HRegion region = initHRegion(tableName, null, null, dodgyWAL1);
+ byte[] bytes = Bytes.toBytes(getName());
+
+ try {
+ Put put = new Put(bytes);
+ put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes);
+ WALKey key = new WALKey(region.getRegionInfo().getEncodedNameAsBytes(),
+ htd.getTableName());
+ WALEdit edit = new WALEdit();
+ CellScanner CellScanner = put.cellScanner();
+ assertTrue(CellScanner.advance());
+ edit.add(CellScanner.current());
+
+ LOG.info("SET throwing of exception on append");
+ dodgyWAL1.throwException = true;
+ // This append provokes a WAL roll request
+ dodgyWAL1.append(htd, region.getRegionInfo(), key, edit, true);
+ boolean exception = false;
+ try {
+ dodgyWAL1.sync();
+ } catch (Exception e) {
+ exception = true;
+ }
+ assertTrue("Did not get sync exception", exception);
+
+ // LogRoller call dodgyWAL1.rollWriter get FailedLogCloseException and
+ // cause server abort.
+ try {
+ // wait LogRoller exit.
+ Thread.sleep(50);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ // make RingBufferEventHandler sleep 1s, so the following sync
+ // endOfBatch=false
+ key = new WALKey(region.getRegionInfo().getEncodedNameAsBytes(),
+ TableName.valueOf("sleep"));
+ dodgyWAL2.append(htd, region.getRegionInfo(), key, edit, true);
+
+ Thread t = new Thread("Sync") {
+ public void run() {
+ try {
+ dodgyWAL2.sync();
+ } catch (IOException e) {
+ LOG.info("In sync", e);
+ }
+ latch.countDown();
+ LOG.info("Sync exiting");
+ };
+ };
+ t.setDaemon(true);
+ t.start();
+ try {
+ // make sure sync have published.
+ Thread.sleep(100);
+ } catch (InterruptedException e1) {
+ e1.printStackTrace();
+ }
+ // make append throw DamagedWALException
+ key = new WALKey(region.getRegionInfo().getEncodedNameAsBytes(),
+ TableName.valueOf("DamagedWALException"));
+ dodgyWAL2.append(htd, region.getRegionInfo(), key, edit, true);
+
+ while (latch.getCount() > 0) {
+ Threads.sleep(100);
+ }
+ assertTrue(server.isAborted());
+ } finally {
+ if (logRoller != null) {
+ logRoller.interrupt();
+ }
+ try {
+ if (region != null) {
+ region.close();
+ }
+ if (dodgyWAL1 != null) {
+ dodgyWAL1.close();
+ }
+ if (dodgyWAL2 != null) {
+ dodgyWAL2.close();
+ }
+ } catch (Exception e) {
+ LOG.info("On way out", e);
+ }
+ }
+ }
+
+ static class DummyServer implements Server {
+ private Configuration conf;
+ private String serverName;
+ private boolean isAborted = false;
+
+ public DummyServer(Configuration conf, String serverName) {
+ this.conf = conf;
+ this.serverName = serverName;
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return conf;
+ }
+
+ @Override
+ public ZooKeeperWatcher getZooKeeper() {
+ return null;
+ }
+
+ @Override
+ public CoordinatedStateManager getCoordinatedStateManager() {
+ return null;
+ }
+
+ @Override
+ public ClusterConnection getConnection() {
+ return null;
+ }
+
+ @Override
+ public MetaTableLocator getMetaTableLocator() {
+ return null;
+ }
+
+ @Override
+ public ServerName getServerName() {
+ return ServerName.valueOf(this.serverName);
+ }
+
+ @Override
+ public void abort(String why, Throwable e) {
+ LOG.info("Aborting " + serverName);
+ this.isAborted = true;
+ }
+
+ @Override
+ public boolean isAborted() {
+ return this.isAborted;
+ }
+
+ @Override
+ public void stop(String why) {
+ this.isAborted = true;
+ }
+
+ @Override
+ public boolean isStopped() {
+ return this.isAborted;
+ }
+
+ @Override
+ public ChoreService getChoreService() {
+ return null;
+ }
+
+ }
+
+ static class DummyWALActionsListener extends WALActionsListener.Base {
+
+ @Override
+ public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey,
+ WALEdit logEdit) throws IOException {
+ if (logKey.getTablename().getNameAsString().equalsIgnoreCase("sleep")) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ if (logKey.getTablename().getNameAsString()
+ .equalsIgnoreCase("DamagedWALException")) {
+ throw new DamagedWALException("Failed appending");
+ }
+ }
+
+ }
+
+ /**
* @return A region on which you must call
* {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
*/
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSyncFuture.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSyncFuture.java
new file mode 100644
index 0000000..851ded4
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSyncFuture.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ RegionServerTests.class, SmallTests.class })
+public class TestSyncFuture {
+
+ @Test(timeout = 60000)
+ public void testGet() throws Exception {
+ long timeout = 5000;
+ long txid = 100000;
+ SyncFuture syncFulture = new SyncFuture();
+ syncFulture.reset(txid);
+ syncFulture.done(txid, null);
+ assertEquals(txid, syncFulture.get(timeout));
+
+ syncFulture.reset(txid);
+ try {
+ syncFulture.get(timeout);
+ fail("Should have timed out but not");
+ } catch (TimeoutIOException e) {
+ // test passed
+ }
+ }
+
+}