diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index c1e8019..0ef0cf7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -19,8 +19,6 @@ package org.apache.hadoop.hbase.regionserver.wal; import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER; -import com.google.common.annotations.VisibleForTesting; - import java.io.IOException; import java.io.InterruptedIOException; import java.lang.management.ManagementFactory; @@ -55,6 +53,7 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.exceptions.TimeoutIOException; import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.DrainBarrier; @@ -71,6 +70,8 @@ import org.apache.htrace.Span; import org.apache.htrace.Trace; import org.apache.htrace.TraceScope; +import com.google.common.annotations.VisibleForTesting; + /** * Implementation of {@link WAL} to go against {@link FileSystem}; i.e. keep WALs in HDFS. Only one * WAL is ever being written at a time. When a WAL hits a configured maximum size, it is rolled. @@ -104,6 +105,8 @@ public abstract class AbstractFSWAL implements WAL { protected static final int DEFAULT_SLOW_SYNC_TIME_MS = 100; // in ms + private static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min + /** * file system instance */ @@ -162,6 +165,8 @@ public abstract class AbstractFSWAL implements WAL { protected final int slowSyncNs; + private final long walSyncTimeout; + // If > than this size, roll the log. protected final long logrollsize; @@ -381,6 +386,8 @@ public abstract class AbstractFSWAL implements WAL { + walFileSuffix + ", logDir=" + this.walDir + ", archiveDir=" + this.walArchiveDir); this.slowSyncNs = 1000000 * conf.getInt("hbase.regionserver.hlog.slowsync.ms", DEFAULT_SLOW_SYNC_TIME_MS); + this.walSyncTimeout = conf.getLong("hbase.regionserver.hlog.sync.timeout", + DEFAULT_WAL_SYNC_TIMEOUT_MS); int maxHandlersCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, 200); // Presize our map of SyncFutures by handler objects. this.syncFuturesByHandler = new ConcurrentHashMap(maxHandlersCount); @@ -659,8 +666,14 @@ public abstract class AbstractFSWAL implements WAL { protected Span blockOnSync(final SyncFuture syncFuture) throws IOException { // Now we have published the ringbuffer, halt the current thread until we get an answer back. try { - syncFuture.get(); + syncFuture.get(walSyncTimeout); return syncFuture.getSpan(); + } catch (TimeoutIOException tioe) { + // SyncFuture reuse by thread, if TimeoutIOException happens, ringbuffer + // still refer to it, so if this thread use it next time may get a wrong + // result. + this.syncFuturesByHandler.remove(Thread.currentThread()); + throw tioe; } catch (InterruptedException ie) { LOG.warn("Interrupted", ie); throw convertInterruptedExceptionToIOException(ie); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 142ab63..edf698e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -1042,6 +1042,12 @@ public class FSHLog extends AbstractFSWAL { } catch (Exception e) { // Failed append. Record the exception. this.exception = e; + // invoking cleanupOutstandingSyncsOnException when append failed with exception, + // it will cleanup existing sync requests recorded in syncFutures but not offered to SyncRunner yet, + // so there won't be any sync future left over if no further truck published to disruptor. + cleanupOutstandingSyncsOnException(sequence, + this.exception instanceof DamagedWALException ? this.exception + : new DamagedWALException("On sync", this.exception)); // Return to keep processing events coming off the ringbuffer return; } finally { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java index 5ec218a..6e302a3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.regionserver.wal; import java.util.concurrent.ExecutionException; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.exceptions.TimeoutIOException; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.htrace.Span; /** @@ -96,6 +98,7 @@ class SyncFuture { this.doneTxid = NOT_DONE; this.txid = txid; this.span = span; + this.throwable = null; return this; } @@ -154,9 +157,16 @@ class SyncFuture { throw new UnsupportedOperationException(); } - synchronized long get() throws InterruptedException, ExecutionException { + synchronized long get(long timeout) throws InterruptedException, + ExecutionException, TimeoutIOException { + final long done = EnvironmentEdgeManager.currentTime() + timeout; while (!isDone()) { wait(1000); + if (EnvironmentEdgeManager.currentTime() >= done) { + throw new TimeoutIOException("Failed to get sync result after " + + timeout + " ms for txid=" + this.txid + + ", WAL system stuck?"); + } } if (this.throwable != null) { throw new ExecutionException(this.throwable); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSyncFuture.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSyncFuture.java new file mode 100644 index 0000000..2cba040 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSyncFuture.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import org.apache.hadoop.hbase.exceptions.TimeoutIOException; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, SmallTests.class }) +public class TestSyncFuture { + + @Test(timeout = 60000) + public void testGet() throws Exception { + long timeout = 5000; + long txid = 100000; + SyncFuture syncFulture = new SyncFuture(txid, null); + syncFulture.done(txid, null); + assertEquals(txid, syncFulture.get(timeout)); + + syncFulture.reset(txid, null); + try { + syncFulture.get(timeout); + fail("Should have timed out but not"); + } catch (TimeoutIOException e) { + // test passed + } + } + +}