From 6eb7d0338f988f43dc1bdf981642b37fb00b5cca Mon Sep 17 00:00:00 2001 From: zhangduo Date: Tue, 6 Feb 2018 16:48:17 +0800 Subject: [PATCH] HBASE-19929 Call RS.stop on a session expired RS may hang --- .../org/apache/hadoop/hbase/util/DrainBarrier.java | 134 -------------------- .../apache/hadoop/hbase/util/TestDrainBarrier.java | 127 ------------------- .../hadoop/hbase/regionserver/HRegionServer.java | 2 +- .../hadoop/hbase/regionserver/LogRoller.java | 40 ++++-- .../hbase/regionserver/wal/AbstractFSWAL.java | 45 ++----- .../hadoop/hbase/regionserver/wal/AsyncFSWAL.java | 42 +++--- .../regionserver/TestShutdownWhileWALBroken.java | 141 +++++++++++++++++++++ 7 files changed, 207 insertions(+), 324 deletions(-) delete mode 100644 hbase-common/src/main/java/org/apache/hadoop/hbase/util/DrainBarrier.java delete mode 100644 hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestDrainBarrier.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestShutdownWhileWALBroken.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/DrainBarrier.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/DrainBarrier.java deleted file mode 100644 index b64ebdf..0000000 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/DrainBarrier.java +++ /dev/null @@ -1,134 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.util; - -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.yetus.audience.InterfaceAudience; - -/** - * A simple barrier that can be used by classes that need to wait for some operations to - * finish before stopping/closing/etc. forever. - */ -@InterfaceAudience.Private -public class DrainBarrier { - /** - * Contains the number of outstanding operations, as well as flags. - * Initially, the number of operations is 1. Each beginOp increments, and endOp decrements it. - * beginOp does not proceed when it sees the draining flag. When stop is called, it atomically - * decrements the number of operations (the initial 1) and sets the draining flag. If stop did - * the decrement to zero, that means there are no more operations outstanding, so stop is done. - * Otherwise, stop blocks, and the endOp that decrements the count to 0 unblocks it. - */ - private final AtomicLong valueAndFlags = new AtomicLong(inc(0)); - private final static long DRAINING_FLAG = 0x1; - private final static int FLAG_BIT_COUNT = 1; - - /** - * Tries to start an operation. - * @return false iff the stop is in progress, and the operation cannot be started. - */ - public boolean beginOp() { - long oldValAndFlags; - do { - oldValAndFlags = valueAndFlags.get(); - if (isDraining(oldValAndFlags)) return false; - } while (!valueAndFlags.compareAndSet(oldValAndFlags, inc(oldValAndFlags))); - return true; - } - - /** - * Ends the operation. Unblocks the blocked caller of stop, if necessary. - */ - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY", - justification="First, we do change the state before notify, 2nd, it doesn't even matter") - public void endOp() { - long oldValAndFlags; - do { - oldValAndFlags = valueAndFlags.get(); - long unacceptableCount = isDraining(oldValAndFlags) ? 0 : 1; - if (getValue(oldValAndFlags) == unacceptableCount) { - throw new AssertionError("endOp called without corresponding beginOp call (" - + "the current count is " + unacceptableCount + ")"); - } - } while (!valueAndFlags.compareAndSet(oldValAndFlags, dec(oldValAndFlags))); - if (getValue(oldValAndFlags) == 1) { - synchronized (this) { this.notifyAll(); } - } - } - - /** - * Blocks new operations from starting, waits for the current ones to drain. - * If someone already called it, returns immediately, which is currently unavoidable as - * most of the users stop and close things right and left, and hope for the best. - * stopAndWaitForOpsOnce asserts instead. - * @throws InterruptedException the wait for operations has been interrupted. - */ - public void stopAndDrainOps() throws InterruptedException { - stopAndDrainOps(true); - } - - /** - * Blocks new operations from starting, waits for the current ones to drain. - * Can only be called once. - * @throws InterruptedException the wait for operations has been interrupted. - */ - public void stopAndDrainOpsOnce() throws InterruptedException { - stopAndDrainOps(false); - } - - /** - * @param ignoreRepeatedCalls If this is true and somebody already called stop, this method - * will return immediately if true; if this is false and somebody - * already called stop, it will assert. - */ - // Justification for warnings - wait is not unconditional, and contrary to what WA_NOT_IN_LOOP - // description says we are not waiting on multiple conditions. - @edu.umd.cs.findbugs.annotations.SuppressWarnings({"UW_UNCOND_WAIT", "WA_NOT_IN_LOOP"}) - private void stopAndDrainOps(boolean ignoreRepeatedCalls) throws InterruptedException { - long oldValAndFlags; - do { - oldValAndFlags = valueAndFlags.get(); - if (isDraining(oldValAndFlags)) { - if (ignoreRepeatedCalls) return; - throw new AssertionError("stopAndWaitForOpsOnce called more than once"); - } - } while (!valueAndFlags.compareAndSet(oldValAndFlags, dec(oldValAndFlags) | DRAINING_FLAG)); - if (getValue(oldValAndFlags) == 1) return; // There were no operations outstanding. - synchronized (this) { this.wait(); } - } - - // Helper methods. - private static final boolean isDraining(long valueAndFlags) { - return (valueAndFlags & DRAINING_FLAG) == DRAINING_FLAG; - } - - private static final long getValue(long valueAndFlags) { - return valueAndFlags >> FLAG_BIT_COUNT; - } - - private static final long inc(long valueAndFlags) { - return valueAndFlags + (1 << FLAG_BIT_COUNT); // Not checking for overflow. - } - - private static final long dec(long valueAndFlags) { - return valueAndFlags - (1 << FLAG_BIT_COUNT); // Negative overflow checked outside. - } -} diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestDrainBarrier.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestDrainBarrier.java deleted file mode 100644 index 5c3d053..0000000 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestDrainBarrier.java +++ /dev/null @@ -1,127 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.util; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.testclassification.MiscTests; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -@Category({MiscTests.class, SmallTests.class}) -public class TestDrainBarrier { - - @ClassRule - public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestDrainBarrier.class); - - @Test - public void testBeginEndStopWork() throws Exception { - DrainBarrier barrier = new DrainBarrier(); - assertTrue(barrier.beginOp()); - assertTrue(barrier.beginOp()); - barrier.endOp(); - barrier.endOp(); - barrier.stopAndDrainOps(); - assertFalse(barrier.beginOp()); - } - - @Test - public void testUnmatchedEndAssert() throws Exception { - DrainBarrier barrier = new DrainBarrier(); - try { - barrier.endOp(); - throw new Error("Should have asserted"); - } catch (AssertionError e) { - } - - barrier.beginOp(); - barrier.beginOp(); - barrier.endOp(); - barrier.endOp(); - try { - barrier.endOp(); - throw new Error("Should have asserted"); - } catch (AssertionError e) { - } - } - - @Test - public void testStopWithoutOpsDoesntBlock() throws Exception { - DrainBarrier barrier = new DrainBarrier(); - barrier.stopAndDrainOpsOnce(); - - barrier = new DrainBarrier(); - barrier.beginOp(); - barrier.endOp(); - barrier.stopAndDrainOpsOnce(); - } - - @Test - /** This test tests blocking and can have false positives in very bad timing cases. */ - public void testStopIsBlockedByOps() throws Exception { - final DrainBarrier barrier = new DrainBarrier(); - barrier.beginOp(); - barrier.beginOp(); - barrier.beginOp(); - barrier.endOp(); - - Thread stoppingThread = new Thread(new Runnable() { - @Override - public void run() { - try { - barrier.stopAndDrainOpsOnce(); - } catch (InterruptedException e) { - fail("Should not have happened"); - } - } - }); - stoppingThread.start(); - - // First "end" should not unblock the thread, but the second should. - barrier.endOp(); - stoppingThread.join(1000); - assertTrue(stoppingThread.isAlive()); - barrier.endOp(); - stoppingThread.join(30000); // When not broken, will be a very fast wait; set safe value. - assertFalse(stoppingThread.isAlive()); - } - - @Test - public void testMultipleStopOnceAssert() throws Exception { - DrainBarrier barrier = new DrainBarrier(); - barrier.stopAndDrainOpsOnce(); - try { - barrier.stopAndDrainOpsOnce(); - throw new Error("Should have asserted"); - } catch (AssertionError e) { - } - } - - @Test - public void testMultipleSloppyStopsHaveNoEffect() throws Exception { - DrainBarrier barrier = new DrainBarrier(); - barrier.stopAndDrainOps(); - barrier.stopAndDrainOps(); - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 3a93c76..0d59b12 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1181,7 +1181,7 @@ public class HRegionServer extends HasThread implements @VisibleForTesting protected void tryRegionServerReport(long reportStartTime, long reportEndTime) - throws IOException { + throws IOException { RegionServerStatusService.BlockingInterface rss = rssStub; if (rss == null) { // the current server could be stopping. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java index 7a247cf..55c5219 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java @@ -132,6 +132,23 @@ public class LogRoller extends HasThread implements Closeable { } } + private void abort(String reason, Throwable cause) { + // close all WALs before calling abort on RS. + // This is because AsyncFSWAL replies on us for rolling a new writer to make progress, and if we + // failed, AsyncFSWAL may be stuck, so we need to close it to let the upper layer know that it + // is already broken. + for (WAL wal : walNeedsRoll.keySet()) { + // shutdown rather than close here since we are going to abort the RS and the wals need to be + // split when recovery + try { + wal.shutdown(); + } catch (IOException e) { + LOG.warn("Failed to shutdown wal", e); + } + } + server.abort(reason, cause); + } + @Override public void run() { while (running) { @@ -153,10 +170,8 @@ public class LogRoller extends HasThread implements Closeable { continue; } // Time for periodic roll - if (LOG.isDebugEnabled()) { - LOG.debug("Wal roll period " + this.rollperiod + "ms elapsed"); - } - } else if (LOG.isDebugEnabled()) { + LOG.debug("Wal roll period {} ms elapsed", this.rollperiod); + } else { LOG.debug("WAL roll requested"); } rollLock.lock(); // FindBugs UL_UNRELEASED_LOCK_EXCEPTION_PATH @@ -170,20 +185,22 @@ public class LogRoller extends HasThread implements Closeable { entry.getValue().booleanValue()); walNeedsRoll.put(wal, Boolean.FALSE); if (regionsToFlush != null) { - for (byte [] r: regionsToFlush) scheduleFlush(r); + for (byte[] r : regionsToFlush) { + scheduleFlush(r); + } } } } catch (FailedLogCloseException e) { - server.abort("Failed log close in log roller", e); + abort("Failed log close in log roller", e); } catch (java.net.ConnectException e) { - server.abort("Failed log close in log roller", e); + abort("Failed log close in log roller", e); } catch (IOException ex) { // Abort if we get here. We probably won't recover an IOE. HBASE-1132 - server.abort("IOE in log roller", + abort("IOE in log roller", ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex); } catch (Exception ex) { LOG.error("Log rolling failed", ex); - server.abort("Log rolling failed", ex); + abort("Log rolling failed", ex); } finally { try { rollLog.set(false); @@ -211,9 +228,8 @@ public class LogRoller extends HasThread implements Closeable { } } if (!scheduled) { - LOG.warn("Failed to schedule flush of " + - Bytes.toString(encodedRegionName) + ", region=" + r + ", requester=" + - requester); + LOG.warn("Failed to schedule flush of {}, region={}, requester={}", + Bytes.toString(encodedRegionName), r, requester); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index 54a5cd3..14fbe10 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -17,12 +17,11 @@ */ package org.apache.hadoop.hbase.regionserver.wal; +import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER; import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument; import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull; -import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER; import com.lmax.disruptor.RingBuffer; - import java.io.FileNotFoundException; import java.io.IOException; import java.io.InterruptedIOException; @@ -46,7 +45,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; - import org.apache.commons.lang3.mutable.MutableLong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -66,7 +64,6 @@ import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CollectionUtils; import org.apache.hadoop.hbase.util.CommonFSUtils; -import org.apache.hadoop.hbase.util.DrainBarrier; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Pair; @@ -84,6 +81,7 @@ import org.apache.htrace.core.TraceScope; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; /** @@ -173,9 +171,6 @@ public abstract class AbstractFSWAL implements WAL { */ protected final SequenceIdAccounting sequenceIdAccounting = new SequenceIdAccounting(); - /** The barrier used to ensure that close() waits for all log rolls and flushes to finish. */ - protected final DrainBarrier closeBarrier = new DrainBarrier(); - protected final long slowSyncNs; private final long walSyncTimeoutNs; @@ -452,32 +447,22 @@ public abstract class AbstractFSWAL implements WAL { @Override public Long startCacheFlush(byte[] encodedRegionName, Set families) { - if (!closeBarrier.beginOp()) { - LOG.info("Flush not started for " + Bytes.toString(encodedRegionName) + "; server closing."); - return null; - } return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, families); } @Override public Long startCacheFlush(byte[] encodedRegionName, Map familyToSeq) { - if (!closeBarrier.beginOp()) { - LOG.info("Flush not started for " + Bytes.toString(encodedRegionName) + "; server closing."); - return null; - } return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, familyToSeq); } @Override public void completeCacheFlush(byte[] encodedRegionName) { this.sequenceIdAccounting.completeCacheFlush(encodedRegionName); - closeBarrier.endOp(); } @Override public void abortCacheFlush(byte[] encodedRegionName) { this.sequenceIdAccounting.abortCacheFlush(encodedRegionName); - closeBarrier.endOp(); } @Override @@ -715,7 +700,11 @@ public abstract class AbstractFSWAL implements WAL { // Now we have published the ringbuffer, halt the current thread until we get an answer back. try { if (syncFuture != null) { - syncFuture.get(walSyncTimeoutNs); + if (closed) { + throw new IOException("WAL has been closed"); + } else { + syncFuture.get(walSyncTimeoutNs); + } } } catch (TimeoutIOException tioe) { // SyncFuture reuse by thread, if TimeoutIOException happens, ringbuffer @@ -755,10 +744,6 @@ public abstract class AbstractFSWAL implements WAL { LOG.debug("WAL closed. Skipping rolling of writer"); return regionsToFlush; } - if (!closeBarrier.beginOp()) { - LOG.debug("WAL closing. Skipping rolling of writer"); - return regionsToFlush; - } try (TraceScope scope = TraceUtil.createTrace("FSHLog.rollWriter")) { Path oldPath = getOldPath(); Path newPath = getNewPath(); @@ -783,8 +768,6 @@ public abstract class AbstractFSWAL implements WAL { throw new IOException( "Underlying FileSystem can't meet stream requirements. See RS log " + "for details.", exception); - } finally { - closeBarrier.endOp(); } return regionsToFlush; } finally { @@ -818,20 +801,18 @@ public abstract class AbstractFSWAL implements WAL { return; } closed = true; - try { - // Prevent all further flushing and rolling. - closeBarrier.stopAndDrainOps(); - } catch (InterruptedException e) { - LOG.error("Exception while waiting for cache flushes and log rolls", e); - Thread.currentThread().interrupt(); - } // Tell our listeners that the log is closing if (!this.listeners.isEmpty()) { for (WALActionsListener i : this.listeners) { i.logCloseRequested(); } } - doShutdown(); + rollWriterLock.lock(); + try { + doShutdown(); + } finally { + rollWriterLock.unlock(); + } } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index faf3b77..19d89df 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -608,19 +608,8 @@ public class AsyncFSWAL extends AbstractFSWAL { @Override protected AsyncWriter createWriterInstance(Path path) throws IOException { - try { - return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, false, eventLoopGroup, - channelClass); - } catch (IOException e) { - // this usually means master already think we are dead so let's fail all the pending - // syncs. The shutdown process of RS will wait for all regions to be closed before calling - // WAL.close so if we do not wake up the thread blocked by sync here it will cause dead - // lock. - if (e.getMessage().contains("Parent directory doesn't exist:")) { - syncFutures.forEach(f -> f.done(f.getTxid(), e)); - } - throw e; - } + return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, false, eventLoopGroup, + channelClass); } private void waitForSafePoint() { @@ -675,17 +664,34 @@ public class AsyncFSWAL extends AbstractFSWAL { closeExecutor.shutdown(); try { if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, TimeUnit.SECONDS)) { - LOG.error("We have waited " + waitOnShutdownInSeconds + " seconds but" - + " the close of async writer doesn't complete." - + "Please check the status of underlying filesystem" - + " or increase the wait time by the config \"" - + ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS + "\""); + LOG.error("We have waited " + waitOnShutdownInSeconds + " seconds but" + + " the close of async writer doesn't complete." + + "Please check the status of underlying filesystem" + + " or increase the wait time by the config \"" + ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS + + "\""); } } catch (InterruptedException e) { LOG.error("The wait for close of async writer is interrupted"); Thread.currentThread().interrupt(); } IOException error = new IOException("WAL has been closed"); + long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1; + // drain all the pending sync requests + for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor <= cursorBound; + nextCursor++) { + if (!waitingConsumePayloads.isPublished(nextCursor)) { + break; + } + RingBufferTruck truck = waitingConsumePayloads.get(nextCursor); + switch (truck.type()) { + case SYNC: + syncFutures.add(truck.unloadSync()); + break; + default: + break; + } + } + // and fail them syncFutures.forEach(f -> f.done(f.getTxid(), error)); if (!(consumeExecutor instanceof EventLoop)) { consumeExecutor.shutdown(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestShutdownWhileWALBroken.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestShutdownWhileWALBroken.java new file mode 100644 index 0000000..1346c03 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestShutdownWhileWALBroken.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; +import org.apache.hadoop.hbase.YouAreDeadException; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; +import org.apache.zookeeper.KeeperException.SessionExpiredException; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * See HBASE-19929 for more details. + */ +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestShutdownWhileWALBroken { + + private static final Logger LOG = LoggerFactory.getLogger(TestShutdownWhileWALBroken.class); + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static TableName TABLE_NAME = TableName.valueOf("TestShutdownWhileWALBroken"); + + private static byte[] CF = Bytes.toBytes("CF"); + + public static final class MyRegionServer extends HRegionServer { + + private final CountDownLatch latch = new CountDownLatch(1); + + public MyRegionServer(Configuration conf) throws IOException { + super(conf); + } + + @Override + protected void tryRegionServerReport(long reportStartTime, long reportEndTime) + throws IOException { + try { + super.tryRegionServerReport(reportStartTime, reportEndTime); + } catch (YouAreDeadException e) { + LOG.info("Caught YouAreDeadException, ignore", e); + } + } + + @Override + public void abort(String reason, Throwable cause) { + if (cause instanceof SessionExpiredException) { + // called from ZKWatcher, let's wait a bit to make sure that we call stop before calling + // abort. + try { + latch.await(); + } catch (InterruptedException e) { + } + } else { + // abort from other classes, usually LogRoller, now we can make progress on abort. + latch.countDown(); + } + super.abort(reason, cause); + } + + } + + @BeforeClass + public static void setUp() throws Exception { + UTIL.getConfiguration().setClass(HConstants.REGION_SERVER_IMPL, MyRegionServer.class, + HRegionServer.class); + UTIL.startMiniCluster(2); + } + + @AfterClass + public static void tearDown() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @Test + public void test() throws Exception { + UTIL.createMultiRegionTable(TABLE_NAME, CF); + try (Table table = UTIL.getConnection().getTable(TABLE_NAME)) { + UTIL.loadTable(table, CF); + } + int numRegions = UTIL.getMiniHBaseCluster().getRegions(TABLE_NAME).size(); + RegionServerThread rst0 = UTIL.getMiniHBaseCluster().getRegionServerThreads().get(0); + RegionServerThread rst1 = UTIL.getMiniHBaseCluster().getRegionServerThreads().get(1); + HRegionServer liveRS; + RegionServerThread toKillRSThread; + if (rst1.getRegionServer().getRegions(TableName.META_TABLE_NAME).isEmpty()) { + liveRS = rst0.getRegionServer(); + toKillRSThread = rst1; + } else { + liveRS = rst1.getRegionServer(); + toKillRSThread = rst0; + } + assertTrue(liveRS.getRegions(TABLE_NAME).size() < numRegions); + UTIL.expireSession(toKillRSThread.getRegionServer().getZooKeeper(), false); + UTIL.waitFor(30000, new ExplainingPredicate() { + + @Override + public boolean evaluate() throws Exception { + return liveRS.getRegions(TABLE_NAME).size() == numRegions; + } + + @Override + public String explainFailure() throws Exception { + return "Failover is not finished yet"; + } + }); + toKillRSThread.getRegionServer().stop("Stop for test"); + // make sure that we can successfully quit + toKillRSThread.join(); + } +} -- 2.7.4