From 4b4f35d11442a235ba06ce54ba566d309a9dd91f Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Tue, 30 Jul 2019 22:36:56 +0800 Subject: [PATCH] HBASE-22539 UT to reproduce the problem --- .../hbase/regionserver/wal/AbstractFSWAL.java | 2 +- .../regionserver/wal/TestAsyncFSWAL.java | 2 +- .../hbase/regionserver/wal/TestFSHLog.java | 2 +- ...tWALCorruptionDueToDanglingByteBuffer.java | 171 ++++++++++++++++++ 4 files changed, 174 insertions(+), 3 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALCorruptionDueToDanglingByteBuffer.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index 2cadded87a..df6d4f8fd5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -971,7 +971,7 @@ public abstract class AbstractFSWAL implements WAL { * Exposed for testing only. Use to tricks like halt the ring buffer appending. */ @VisibleForTesting - void atHeadOfRingBufferEventHandlerAppend() { + protected void atHeadOfRingBufferEventHandlerAppend() { // Noop } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java index 0fca99c112..4cfe49e9e9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java @@ -113,7 +113,7 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL { failIfWALExists, prefix, suffix, GROUP, CHANNEL_CLASS) { @Override - void atHeadOfRingBufferEventHandlerAppend() { + protected void atHeadOfRingBufferEventHandlerAppend() { action.run(); super.atHeadOfRingBufferEventHandlerAppend(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java index f288f74cd8..810d7df5d4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java @@ -89,7 +89,7 @@ public class TestFSHLog extends AbstractTestFSWAL { prefix, suffix) { @Override - void atHeadOfRingBufferEventHandlerAppend() { + protected void atHeadOfRingBufferEventHandlerAppend() { action.run(); super.atHeadOfRingBufferEventHandlerAppend(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALCorruptionDueToDanglingByteBuffer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALCorruptionDueToDanglingByteBuffer.java new file mode 100644 index 0000000000..bcbd3f1b7b --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALCorruptionDueToDanglingByteBuffer.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL; +import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; +import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.Pair; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.io.netty.channel.Channel; +import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; + +/** + * Testcase for HBASE-22539 + */ +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestWALCorruptionDueToDanglingByteBuffer { + + private static final Logger LOG = + LoggerFactory.getLogger(TestWALCorruptionDueToDanglingByteBuffer.class); + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static CountDownLatch ARRIVE; + + private static CountDownLatch RESUME; + + private static TableName TABLE_NAME = TableName.valueOf("Corruption"); + + private static byte[] CF = Bytes.toBytes("cf"); + + private static byte[] CQ = Bytes.toBytes("cq"); + + public static final class PauseWAL extends AsyncFSWAL { + + public PauseWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir, + Configuration conf, List listeners, boolean failIfWALExists, + String prefix, String suffix, EventLoopGroup eventLoopGroup, + Class channelClass) throws FailedLogCloseException, IOException { + super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix, + eventLoopGroup, channelClass); + } + + @Override + protected void atHeadOfRingBufferEventHandlerAppend() { + if (ARRIVE != null) { + ARRIVE.countDown(); + try { + RESUME.await(); + } catch (InterruptedException e) { + } + } + } + } + + public static final class PauseWALProvider extends AbstractFSWALProvider { + + private EventLoopGroup eventLoopGroup; + + private Class channelClass; + + @Override + protected PauseWAL createWAL() throws IOException { + return new PauseWAL(CommonFSUtils.getWALFileSystem(conf), CommonFSUtils.getWALRootDir(conf), + getWALDirectoryName(factory.factoryId), + getWALArchiveDirectoryName(conf, factory.factoryId), conf, listeners, true, logPrefix, + META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null, eventLoopGroup, + channelClass); + } + + @Override + protected void doInit(Configuration conf) throws IOException { + Pair> eventLoopGroupAndChannelClass = + NettyAsyncFSWALConfigHelper.getEventLoopConfig(conf); + eventLoopGroup = eventLoopGroupAndChannelClass.getFirst(); + channelClass = eventLoopGroupAndChannelClass.getSecond(); + } + } + + @BeforeClass + public static void setUp() throws Exception { + UTIL.getConfiguration().setClass(WALFactory.WAL_PROVIDER, PauseWALProvider.class, + WALProvider.class); + UTIL.startMiniCluster(1); + UTIL.createTable(TABLE_NAME, CF); + UTIL.waitTableAvailable(TABLE_NAME); + } + + @AfterClass + public static void tearDown() throws Exception { + UTIL.shutdownMiniCluster(); + } + + private byte[] getBytes(String prefix, int index) { + return Bytes.toBytes(String.format("%s-%08d", prefix, index)); + } + + @Test + public void test() throws Exception { + LOG.info("Stop WAL appending..."); + ARRIVE = new CountDownLatch(1); + RESUME = new CountDownLatch(1); + try (Table table = UTIL.getConnection().getTable(TABLE_NAME)) { + LOG.info("Put 100 rows with " + Durability.ASYNC_WAL + "..."); + for (int i = 0; i < 100; i++) { + table.batch(Arrays.asList(new Put(getBytes("row", i)) + .addColumn(CF, CQ, getBytes("value", i)).setDurability(Durability.ASYNC_WAL)), + new Object[1]); + } + ARRIVE.await(); + ARRIVE = null; + LOG.info("Resume WAL appending..."); + RESUME.countDown(); + LOG.info("Put a single row to force a WAL sync..."); + table.put(new Put(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("value"))); + LOG.info("Abort the only region server"); + UTIL.getMiniHBaseCluster().abortRegionServer(0); + LOG.info("Start a new region server"); + UTIL.getMiniHBaseCluster().startRegionServerAndWait(30000); + UTIL.waitTableAvailable(TABLE_NAME); + LOG.info("Check if all rows are still valid"); + for (int i = 0; i < 1000; i++) { + Result result = table.get(new Get(getBytes("row", i))); + assertEquals(Bytes.toString(getBytes("value", i)), Bytes.toString(result.getValue(CF, CQ))); + } + Result result = table.get(new Get(Bytes.toBytes("row"))); + assertEquals("value", Bytes.toString(result.getValue(CF, CQ))); + } + } +} -- 2.17.1