From 756c86c6c9c50a5d4587d9ee6a5b88a6c2601772 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Tue, 26 Apr 2016 11:06:52 +0800 Subject: [PATCH] HBASE-15536 Make AsyncFSWAL as our default WAL --- .../hbase/io/asyncfs/AsyncFSOutputHelper.java | 1 + .../hadoop/hbase/regionserver/wal/FSHLog.java | 2 +- .../org/apache/hadoop/hbase/wal/WALFactory.java | 2 +- .../hbase/regionserver/TestFSErrorsExposed.java | 6 +- .../hbase/regionserver/TestMobStoreScanner.java | 6 +- .../wal/AbstractTestLogRollPeriod.java | 156 +++ .../regionserver/wal/AbstractTestWALReplay.java | 1281 ++++++++++++++++++++ .../regionserver/wal/TestAsyncLogRollPeriod.java | 6 +- .../hbase/regionserver/wal/TestAsyncWALReplay.java | 6 +- .../wal/TestAsyncWALReplayCompressed.java | 6 +- .../hbase/regionserver/wal/TestLogRollAbort.java | 1 + .../hbase/regionserver/wal/TestLogRollPeriod.java | 138 +-- .../hbase/regionserver/wal/TestLogRolling.java | 1 + .../regionserver/wal/TestLogRollingNoCluster.java | 1 + .../regionserver/wal/TestSecureAsyncWALReplay.java | 2 +- .../regionserver/wal/TestSecureWALReplay.java | 10 +- .../hbase/regionserver/wal/TestWALReplay.java | 1256 +------------------ .../regionserver/wal/TestWALReplayCompressed.java | 10 +- .../apache/hadoop/hbase/wal/TestWALFactory.java | 2 + .../hadoop/hbase/wal/TestWALReaderOnSecureWAL.java | 11 +- 20 files changed, 1490 insertions(+), 1414 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRollPeriod.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java index 576bb29..1b2c390 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java @@ -30,6 +30,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FSDataOutputStream; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 67c2b93..7b76aa2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -258,7 +258,7 @@ public class FSHLog extends AbstractFSWAL { /** * Currently, we need to expose the writer's OutputStream to tests so that they can manipulate the * default behavior (such as setting the maxRecoveryErrorCount value for example (see - * {@link TestWALReplay#testReplayEditsWrittenIntoWAL()}). This is done using reflection on the + * {@link AbstractTestWALReplay#testReplayEditsWrittenIntoWAL()}). This is done using reflection on the * underlying HDFS OutputStream. NOTE: This could be removed once Hadoop1 support is removed. * @return null if underlying stream is not ready. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java index a9c17b5..fd0f2c9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java @@ -74,7 +74,7 @@ public class WALFactory { * Maps between configuration names for providers and implementation classes. */ static enum Providers { - defaultProvider(DefaultWALProvider.class), + defaultProvider(AsyncFSWALProvider.class), filesystem(DefaultWALProvider.class), multiwal(RegionGroupingProvider.class), asyncfs(AsyncFSWALProvider.class); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java index 94daf83..55f882a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java @@ -183,7 +183,8 @@ public class TestFSErrorsExposed { try { // Make it fail faster. util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); - + util.getConfiguration().setInt("hbase.lease.recovery.timeout", 10000); + util.getConfiguration().setInt("hbase.lease.recovery.dfs.timeout", 1000); util.startMiniCluster(1); TableName tableName = TableName.valueOf("table"); byte[] fam = Bytes.toBytes("fam"); @@ -276,7 +277,4 @@ public class TestFSErrorsExposed { } } } - - - } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java index 4781f23..04ad4c9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HFileArchiveUtil; +import org.apache.hadoop.hbase.wal.WALFactory; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -79,8 +80,9 @@ public class TestMobStoreScanner { public static void setUpBeforeClass() throws Exception { TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0); TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true); - TEST_UTIL.getConfiguration().setInt("hbase.client.keyvalue.maxsize", 100*1024*1024); - + TEST_UTIL.getConfiguration().setInt("hbase.client.keyvalue.maxsize", 100 * 1024 * 1024); + // TODO: AsyncFSWAL can not handle large edits right now, remove this after we fix the issue. + TEST_UTIL.getConfiguration().set(WALFactory.WAL_PROVIDER, "filesystem"); TEST_UTIL.startMiniCluster(1); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRollPeriod.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRollPeriod.java new file mode 100644 index 0000000..f70bcc8 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRollPeriod.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import static org.junit.Assert.assertFalse; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WAL; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Tests that verifies that the log is forced to be rolled every "hbase.regionserver.logroll.period" + */ +public abstract class AbstractTestLogRollPeriod { + private static final Log LOG = LogFactory.getLog(AbstractTestLogRollPeriod.class); + + protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private final static long LOG_ROLL_PERIOD = 4000; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // disable the ui + TEST_UTIL.getConfiguration().setInt("hbase.regionsever.info.port", -1); + + TEST_UTIL.getConfiguration().setLong("hbase.regionserver.logroll.period", LOG_ROLL_PERIOD); + + TEST_UTIL.startMiniCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * Tests that the LogRoller perform the roll even if there are no edits + */ + @Test + public void testNoEdits() throws Exception { + TableName tableName = TableName.valueOf("TestLogRollPeriodNoEdits"); + TEST_UTIL.createTable(tableName, "cf"); + try { + Table table = TEST_UTIL.getConnection().getTable(tableName); + try { + HRegionServer server = TEST_UTIL.getRSForFirstRegionInTable(tableName); + WAL log = server.getWAL(null); + checkMinLogRolls(log, 5); + } finally { + table.close(); + } + } finally { + TEST_UTIL.deleteTable(tableName); + } + } + + /** + * Tests that the LogRoller perform the roll with some data in the log + */ + @Test(timeout=60000) + public void testWithEdits() throws Exception { + final TableName tableName = TableName.valueOf("TestLogRollPeriodWithEdits"); + final String family = "cf"; + + TEST_UTIL.createTable(tableName, family); + try { + HRegionServer server = TEST_UTIL.getRSForFirstRegionInTable(tableName); + WAL log = server.getWAL(null); + final Table table = TEST_UTIL.getConnection().getTable(tableName); + + Thread writerThread = new Thread("writer") { + @Override + public void run() { + try { + long row = 0; + while (!interrupted()) { + Put p = new Put(Bytes.toBytes(String.format("row%d", row))); + p.addColumn(Bytes.toBytes(family), Bytes.toBytes("col"), Bytes.toBytes(row)); + table.put(p); + row++; + + Thread.sleep(LOG_ROLL_PERIOD / 16); + } + } catch (Exception e) { + LOG.warn(e); + } + } + }; + + try { + writerThread.start(); + checkMinLogRolls(log, 5); + } finally { + writerThread.interrupt(); + writerThread.join(); + table.close(); + } + } finally { + TEST_UTIL.deleteTable(tableName); + } + } + + private void checkMinLogRolls(final WAL log, final int minRolls) + throws Exception { + final List paths = new ArrayList(); + log.registerWALActionsListener(new WALActionsListener.Base() { + @Override + public void postLogRoll(Path oldFile, Path newFile) { + LOG.debug("postLogRoll: oldFile="+oldFile+" newFile="+newFile); + paths.add(newFile); + } + }); + + // Sleep until we should get at least min-LogRoll events + long wtime = System.currentTimeMillis(); + Thread.sleep((minRolls + 1) * LOG_ROLL_PERIOD); + // Do some extra sleep in case the machine is slow, + // and the log-roll is not triggered exactly on LOG_ROLL_PERIOD. + final int NUM_RETRIES = 1 + 8 * (minRolls - paths.size()); + for (int retry = 0; paths.size() < minRolls && retry < NUM_RETRIES; ++retry) { + Thread.sleep(LOG_ROLL_PERIOD / 4); + } + wtime = System.currentTimeMillis() - wtime; + LOG.info(String.format("got %d rolls after %dms (%dms each) - expected at least %d rolls", + paths.size(), wtime, wtime / paths.size(), minRolls)); + assertFalse(paths.size() < minRolls); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java new file mode 100644 index 0000000..53a86f9 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java @@ -0,0 +1,1281 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import java.io.FilterInputStream; +import java.io.IOException; +import java.lang.reflect.Field; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.NavigableMap; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; +import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; +import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher; +import org.apache.hadoop.hbase.regionserver.FlushRequestListener; +import org.apache.hadoop.hbase.regionserver.FlushRequester; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot; +import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdge; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.HFileTestUtil; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.wal.DefaultWALProvider; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WALSplitter; +import org.apache.hadoop.hdfs.DFSInputStream; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +/** + * Test replay of edits out of a WAL split. + */ +public abstract class AbstractTestWALReplay { + private static final Log LOG = LogFactory.getLog(AbstractTestWALReplay.class); + static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private final EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate(); + private Path hbaseRootDir = null; + private String logName; + private Path oldLogDir; + private Path logDir; + private FileSystem fs; + private Configuration conf; + private RecoveryMode mode; + private WALFactory wals; + + @Rule + public final TestName currentTest = new TestName(); + + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setBoolean("dfs.support.append", true); + // The below config supported by 0.20-append and CDH3b2 + conf.setInt("dfs.client.block.recovery.retries", 2); + TEST_UTIL.startMiniCluster(3); + Path hbaseRootDir = + TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase")); + LOG.info("hbase.rootdir=" + hbaseRootDir); + FSUtils.setRootDir(conf, hbaseRootDir); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Before + public void setUp() throws Exception { + this.conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); + this.fs = TEST_UTIL.getDFSCluster().getFileSystem(); + this.hbaseRootDir = FSUtils.getRootDir(this.conf); + this.oldLogDir = new Path(this.hbaseRootDir, HConstants.HREGION_OLDLOGDIR_NAME); + this.logName = DefaultWALProvider.getWALDirectoryName(currentTest.getMethodName() + "-manual"); + this.logDir = new Path(this.hbaseRootDir, logName); + if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) { + TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true); + } + this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? + RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING); + this.wals = new WALFactory(conf, null, currentTest.getMethodName()); + } + + @After + public void tearDown() throws Exception { + this.wals.close(); + TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true); + } + + /* + * @param p Directory to cleanup + */ + private void deleteDir(final Path p) throws IOException { + if (this.fs.exists(p)) { + if (!this.fs.delete(p, true)) { + throw new IOException("Failed remove of " + p); + } + } + } + + /** + * + * @throws Exception + */ + @Test + public void testReplayEditsAfterRegionMovedWithMultiCF() throws Exception { + final TableName tableName = + TableName.valueOf("testReplayEditsAfterRegionMovedWithMultiCF"); + byte[] family1 = Bytes.toBytes("cf1"); + byte[] family2 = Bytes.toBytes("cf2"); + byte[] qualifier = Bytes.toBytes("q"); + byte[] value = Bytes.toBytes("testV"); + byte[][] familys = { family1, family2 }; + TEST_UTIL.createTable(tableName, familys); + Table htable = TEST_UTIL.getConnection().getTable(tableName); + Put put = new Put(Bytes.toBytes("r1")); + put.addColumn(family1, qualifier, value); + htable.put(put); + ResultScanner resultScanner = htable.getScanner(new Scan()); + int count = 0; + while (resultScanner.next() != null) { + count++; + } + resultScanner.close(); + assertEquals(1, count); + + MiniHBaseCluster hbaseCluster = TEST_UTIL.getMiniHBaseCluster(); + List regions = hbaseCluster.getRegions(tableName); + assertEquals(1, regions.size()); + + // move region to another regionserver + Region destRegion = regions.get(0); + int originServerNum = hbaseCluster + .getServerWith(destRegion.getRegionInfo().getRegionName()); + assertTrue("Please start more than 1 regionserver", hbaseCluster + .getRegionServerThreads().size() > 1); + int destServerNum = 0; + while (destServerNum == originServerNum) { + destServerNum++; + } + HRegionServer originServer = hbaseCluster.getRegionServer(originServerNum); + HRegionServer destServer = hbaseCluster.getRegionServer(destServerNum); + // move region to destination regionserver + moveRegionAndWait(destRegion, destServer); + + // delete the row + Delete del = new Delete(Bytes.toBytes("r1")); + htable.delete(del); + resultScanner = htable.getScanner(new Scan()); + count = 0; + while (resultScanner.next() != null) { + count++; + } + resultScanner.close(); + assertEquals(0, count); + + // flush region and make major compaction + Region region = destServer.getOnlineRegion(destRegion.getRegionInfo().getRegionName()); + region.flush(true); + // wait to complete major compaction + for (Store store : region.getStores()) { + store.triggerMajorCompaction(); + } + region.compact(true); + + // move region to origin regionserver + moveRegionAndWait(destRegion, originServer); + // abort the origin regionserver + originServer.abort("testing"); + + // see what we get + Result result = htable.get(new Get(Bytes.toBytes("r1"))); + if (result != null) { + assertTrue("Row is deleted, but we get" + result.toString(), + (result == null) || result.isEmpty()); + } + resultScanner.close(); + } + + private void moveRegionAndWait(Region destRegion, HRegionServer destServer) + throws InterruptedException, MasterNotRunningException, + ZooKeeperConnectionException, IOException { + HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster(); + TEST_UTIL.getHBaseAdmin().move( + destRegion.getRegionInfo().getEncodedNameAsBytes(), + Bytes.toBytes(destServer.getServerName().getServerName())); + while (true) { + ServerName serverName = master.getAssignmentManager() + .getRegionStates().getRegionServerOfRegion(destRegion.getRegionInfo()); + if (serverName != null && serverName.equals(destServer.getServerName())) { + TEST_UTIL.assertRegionOnServer( + destRegion.getRegionInfo(), serverName, 200); + break; + } + Thread.sleep(10); + } + } + + /** + * Tests for hbase-2727. + * @throws Exception + * @see HBASE-2727 + */ + @Test + public void test2727() throws Exception { + // Test being able to have > 1 set of edits in the recovered.edits directory. + // Ensure edits are replayed properly. + final TableName tableName = + TableName.valueOf("test2727"); + + MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); + HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); + Path basedir = FSUtils.getTableDir(hbaseRootDir, tableName); + deleteDir(basedir); + + HTableDescriptor htd = createBasic3FamilyHTD(tableName); + Region region2 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); + HBaseTestingUtility.closeRegionAndWAL(region2); + final byte [] rowName = tableName.getName(); + + WAL wal1 = createWAL(this.conf); + // Add 1k to each family. + final int countPerFamily = 1000; + + NavigableMap scopes = new TreeMap( + Bytes.BYTES_COMPARATOR); + for(byte[] fam : htd.getFamiliesKeys()) { + scopes.put(fam, 0); + } + for (HColumnDescriptor hcd: htd.getFamilies()) { + addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, ee, + wal1, htd, mvcc, scopes); + } + wal1.shutdown(); + runWALSplit(this.conf); + + WAL wal2 = createWAL(this.conf); + // Add 1k to each family. + for (HColumnDescriptor hcd: htd.getFamilies()) { + addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, + ee, wal2, htd, mvcc, scopes); + } + wal2.shutdown(); + runWALSplit(this.conf); + + WAL wal3 = createWAL(this.conf); + try { + HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal3); + long seqid = region.getOpenSeqNum(); + // The regions opens with sequenceId as 1. With 6k edits, its sequence number reaches 6k + 1. + // When opened, this region would apply 6k edits, and increment the sequenceId by 1 + assertTrue(seqid > mvcc.getWritePoint()); + assertEquals(seqid - 1, mvcc.getWritePoint()); + LOG.debug("region.getOpenSeqNum(): " + region.getOpenSeqNum() + ", wal3.id: " + + mvcc.getReadPoint()); + + // TODO: Scan all. + region.close(); + } finally { + wal3.close(); + } + } + + /** + * Test case of HRegion that is only made out of bulk loaded files. Assert + * that we don't 'crash'. + * @throws IOException + * @throws IllegalAccessException + * @throws NoSuchFieldException + * @throws IllegalArgumentException + * @throws SecurityException + */ + @Test + public void testRegionMadeOfBulkLoadedFilesOnly() + throws IOException, SecurityException, IllegalArgumentException, + NoSuchFieldException, IllegalAccessException, InterruptedException { + final TableName tableName = + TableName.valueOf("testRegionMadeOfBulkLoadedFilesOnly"); + final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); + final Path basedir = new Path(this.hbaseRootDir, tableName.getNameAsString()); + deleteDir(basedir); + final HTableDescriptor htd = createBasic3FamilyHTD(tableName); + Region region2 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); + HBaseTestingUtility.closeRegionAndWAL(region2); + WAL wal = createWAL(this.conf); + Region region = HRegion.openHRegion(hri, htd, wal, this.conf); + + byte [] family = htd.getFamilies().iterator().next().getName(); + Path f = new Path(basedir, "hfile"); + HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(""), + Bytes.toBytes("z"), 10); + List > hfs= new ArrayList>(1); + hfs.add(Pair.newPair(family, f.toString())); + region.bulkLoadHFiles(hfs, true, null); + + // Add an edit so something in the WAL + byte [] row = tableName.getName(); + region.put((new Put(row)).addColumn(family, family, family)); + wal.sync(); + final int rowsInsertedCount = 11; + + assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan()))); + + // Now 'crash' the region by stealing its wal + final Configuration newConf = HBaseConfiguration.create(this.conf); + User user = HBaseTestingUtility.getDifferentUser(newConf, + tableName.getNameAsString()); + user.runAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws Exception { + runWALSplit(newConf); + WAL wal2 = createWAL(newConf); + + HRegion region2 = HRegion.openHRegion(newConf, FileSystem.get(newConf), + hbaseRootDir, hri, htd, wal2); + long seqid2 = region2.getOpenSeqNum(); + assertTrue(seqid2 > -1); + assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan()))); + + // I can't close wal1. Its been appropriated when we split. + region2.close(); + wal2.close(); + return null; + } + }); + } + + /** + * HRegion test case that is made of a major compacted HFile (created with three bulk loaded + * files) and an edit in the memstore. + * This is for HBASE-10958 "[dataloss] Bulk loading with seqids can prevent some log entries + * from being replayed" + * @throws IOException + * @throws IllegalAccessException + * @throws NoSuchFieldException + * @throws IllegalArgumentException + * @throws SecurityException + */ + @Test + public void testCompactedBulkLoadedFiles() + throws IOException, SecurityException, IllegalArgumentException, + NoSuchFieldException, IllegalAccessException, InterruptedException { + final TableName tableName = + TableName.valueOf("testCompactedBulkLoadedFiles"); + final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); + final Path basedir = new Path(this.hbaseRootDir, tableName.getNameAsString()); + deleteDir(basedir); + final HTableDescriptor htd = createBasic3FamilyHTD(tableName); + HRegion region2 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); + HBaseTestingUtility.closeRegionAndWAL(region2); + WAL wal = createWAL(this.conf); + HRegion region = HRegion.openHRegion(hri, htd, wal, this.conf); + + // Add an edit so something in the WAL + byte [] row = tableName.getName(); + byte [] family = htd.getFamilies().iterator().next().getName(); + region.put((new Put(row)).addColumn(family, family, family)); + wal.sync(); + + List > hfs= new ArrayList>(1); + for (int i = 0; i < 3; i++) { + Path f = new Path(basedir, "hfile"+i); + HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(i + "00"), + Bytes.toBytes(i + "50"), 10); + hfs.add(Pair.newPair(family, f.toString())); + } + region.bulkLoadHFiles(hfs, true, null); + final int rowsInsertedCount = 31; + assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan()))); + + // major compact to turn all the bulk loaded files into one normal file + region.compact(true); + assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan()))); + + // Now 'crash' the region by stealing its wal + final Configuration newConf = HBaseConfiguration.create(this.conf); + User user = HBaseTestingUtility.getDifferentUser(newConf, + tableName.getNameAsString()); + user.runAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws Exception { + runWALSplit(newConf); + WAL wal2 = createWAL(newConf); + + HRegion region2 = HRegion.openHRegion(newConf, FileSystem.get(newConf), + hbaseRootDir, hri, htd, wal2); + long seqid2 = region2.getOpenSeqNum(); + assertTrue(seqid2 > -1); + assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan()))); + + // I can't close wal1. Its been appropriated when we split. + region2.close(); + wal2.close(); + return null; + } + }); + } + + + /** + * Test writing edits into an HRegion, closing it, splitting logs, opening + * Region again. Verify seqids. + * @throws IOException + * @throws IllegalAccessException + * @throws NoSuchFieldException + * @throws IllegalArgumentException + * @throws SecurityException + */ + @Test + public void testReplayEditsWrittenViaHRegion() + throws IOException, SecurityException, IllegalArgumentException, + NoSuchFieldException, IllegalAccessException, InterruptedException { + final TableName tableName = + TableName.valueOf("testReplayEditsWrittenViaHRegion"); + final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); + final Path basedir = FSUtils.getTableDir(this.hbaseRootDir, tableName); + deleteDir(basedir); + final byte[] rowName = tableName.getName(); + final int countPerFamily = 10; + final HTableDescriptor htd = createBasic3FamilyHTD(tableName); + HRegion region3 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); + HBaseTestingUtility.closeRegionAndWAL(region3); + // Write countPerFamily edits into the three families. Do a flush on one + // of the families during the load of edits so its seqid is not same as + // others to test we do right thing when different seqids. + WAL wal = createWAL(this.conf); + HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal); + long seqid = region.getOpenSeqNum(); + boolean first = true; + for (HColumnDescriptor hcd: htd.getFamilies()) { + addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x"); + if (first) { + // If first, so we have at least one family w/ different seqid to rest. + region.flush(true); + first = false; + } + } + // Now assert edits made it in. + final Get g = new Get(rowName); + Result result = region.get(g); + assertEquals(countPerFamily * htd.getFamilies().size(), + result.size()); + // Now close the region (without flush), split the log, reopen the region and assert that + // replay of log has the correct effect, that our seqids are calculated correctly so + // all edits in logs are seen as 'stale'/old. + region.close(true); + wal.shutdown(); + runWALSplit(this.conf); + WAL wal2 = createWAL(this.conf); + HRegion region2 = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal2); + long seqid2 = region2.getOpenSeqNum(); + assertTrue(seqid + result.size() < seqid2); + final Result result1b = region2.get(g); + assertEquals(result.size(), result1b.size()); + + // Next test. Add more edits, then 'crash' this region by stealing its wal + // out from under it and assert that replay of the log adds the edits back + // correctly when region is opened again. + for (HColumnDescriptor hcd: htd.getFamilies()) { + addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region2, "y"); + } + // Get count of edits. + final Result result2 = region2.get(g); + assertEquals(2 * result.size(), result2.size()); + wal2.sync(); + final Configuration newConf = HBaseConfiguration.create(this.conf); + User user = HBaseTestingUtility.getDifferentUser(newConf, + tableName.getNameAsString()); + user.runAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws Exception { + runWALSplit(newConf); + FileSystem newFS = FileSystem.get(newConf); + // Make a new wal for new region open. + WAL wal3 = createWAL(newConf); + final AtomicInteger countOfRestoredEdits = new AtomicInteger(0); + HRegion region3 = new HRegion(basedir, wal3, newFS, newConf, hri, htd, null) { + @Override + protected boolean restoreEdit(Store s, Cell cell) { + boolean b = super.restoreEdit(s, cell); + countOfRestoredEdits.incrementAndGet(); + return b; + } + }; + long seqid3 = region3.initialize(); + Result result3 = region3.get(g); + // Assert that count of cells is same as before crash. + assertEquals(result2.size(), result3.size()); + assertEquals(htd.getFamilies().size() * countPerFamily, + countOfRestoredEdits.get()); + + // I can't close wal1. Its been appropriated when we split. + region3.close(); + wal3.close(); + return null; + } + }); + } + + /** + * Test that we recover correctly when there is a failure in between the + * flushes. i.e. Some stores got flushed but others did not. + * + * Unfortunately, there is no easy hook to flush at a store level. The way + * we get around this is by flushing at the region level, and then deleting + * the recently flushed store file for one of the Stores. This would put us + * back in the situation where all but that store got flushed and the region + * died. + * + * We restart Region again, and verify that the edits were replayed. + * + * @throws IOException + * @throws IllegalAccessException + * @throws NoSuchFieldException + * @throws IllegalArgumentException + * @throws SecurityException + */ + @Test + public void testReplayEditsAfterPartialFlush() + throws IOException, SecurityException, IllegalArgumentException, + NoSuchFieldException, IllegalAccessException, InterruptedException { + final TableName tableName = + TableName.valueOf("testReplayEditsWrittenViaHRegion"); + final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); + final Path basedir = FSUtils.getTableDir(this.hbaseRootDir, tableName); + deleteDir(basedir); + final byte[] rowName = tableName.getName(); + final int countPerFamily = 10; + final HTableDescriptor htd = createBasic3FamilyHTD(tableName); + HRegion region3 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); + HBaseTestingUtility.closeRegionAndWAL(region3); + // Write countPerFamily edits into the three families. Do a flush on one + // of the families during the load of edits so its seqid is not same as + // others to test we do right thing when different seqids. + WAL wal = createWAL(this.conf); + HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal); + long seqid = region.getOpenSeqNum(); + for (HColumnDescriptor hcd: htd.getFamilies()) { + addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x"); + } + + // Now assert edits made it in. + final Get g = new Get(rowName); + Result result = region.get(g); + assertEquals(countPerFamily * htd.getFamilies().size(), + result.size()); + + // Let us flush the region + region.flush(true); + region.close(true); + wal.shutdown(); + + // delete the store files in the second column family to simulate a failure + // in between the flushcache(); + // we have 3 families. killing the middle one ensures that taking the maximum + // will make us fail. + int cf_count = 0; + for (HColumnDescriptor hcd: htd.getFamilies()) { + cf_count++; + if (cf_count == 2) { + region.getRegionFileSystem().deleteFamily(hcd.getNameAsString()); + } + } + + + // Let us try to split and recover + runWALSplit(this.conf); + WAL wal2 = createWAL(this.conf); + HRegion region2 = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal2); + long seqid2 = region2.getOpenSeqNum(); + assertTrue(seqid + result.size() < seqid2); + + final Result result1b = region2.get(g); + assertEquals(result.size(), result1b.size()); + } + + + // StoreFlusher implementation used in testReplayEditsAfterAbortingFlush. + // Only throws exception if throwExceptionWhenFlushing is set true. + public static class CustomStoreFlusher extends DefaultStoreFlusher { + // Switch between throw and not throw exception in flush + static final AtomicBoolean throwExceptionWhenFlushing = new AtomicBoolean(false); + + public CustomStoreFlusher(Configuration conf, Store store) { + super(conf, store); + } + @Override + public List flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, + MonitoredTask status, ThroughputController throughputController) throws IOException { + if (throwExceptionWhenFlushing.get()) { + throw new IOException("Simulated exception by tests"); + } + return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController); + } + + }; + + /** + * Test that we could recover the data correctly after aborting flush. In the + * test, first we abort flush after writing some data, then writing more data + * and flush again, at last verify the data. + * @throws IOException + */ + @Test + public void testReplayEditsAfterAbortingFlush() throws IOException { + final TableName tableName = + TableName.valueOf("testReplayEditsAfterAbortingFlush"); + final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); + final Path basedir = FSUtils.getTableDir(this.hbaseRootDir, tableName); + deleteDir(basedir); + final HTableDescriptor htd = createBasic3FamilyHTD(tableName); + HRegion region3 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); + HBaseTestingUtility.closeRegionAndWAL(region3); + // Write countPerFamily edits into the three families. Do a flush on one + // of the families during the load of edits so its seqid is not same as + // others to test we do right thing when different seqids. + WAL wal = createWAL(this.conf); + RegionServerServices rsServices = Mockito.mock(RegionServerServices.class); + Mockito.doReturn(false).when(rsServices).isAborted(); + when(rsServices.getServerName()).thenReturn(ServerName.valueOf("foo", 10, 10)); + Configuration customConf = new Configuration(this.conf); + customConf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY, + CustomStoreFlusher.class.getName()); + HRegion region = + HRegion.openHRegion(this.hbaseRootDir, hri, htd, wal, customConf, rsServices, null); + int writtenRowCount = 10; + List families = new ArrayList( + htd.getFamilies()); + for (int i = 0; i < writtenRowCount; i++) { + Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i))); + put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"), + Bytes.toBytes("val")); + region.put(put); + } + + // Now assert edits made it in. + RegionScanner scanner = region.getScanner(new Scan()); + assertEquals(writtenRowCount, getScannedCount(scanner)); + + // Let us flush the region + CustomStoreFlusher.throwExceptionWhenFlushing.set(true); + try { + region.flush(true); + fail("Injected exception hasn't been thrown"); + } catch (Throwable t) { + LOG.info("Expected simulated exception when flushing region," + + t.getMessage()); + // simulated to abort server + Mockito.doReturn(true).when(rsServices).isAborted(); + region.setClosing(false); // region normally does not accept writes after + // DroppedSnapshotException. We mock around it for this test. + } + // writing more data + int moreRow = 10; + for (int i = writtenRowCount; i < writtenRowCount + moreRow; i++) { + Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i))); + put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"), + Bytes.toBytes("val")); + region.put(put); + } + writtenRowCount += moreRow; + // call flush again + CustomStoreFlusher.throwExceptionWhenFlushing.set(false); + try { + region.flush(true); + } catch (IOException t) { + LOG.info("Expected exception when flushing region because server is stopped," + + t.getMessage()); + } + + region.close(true); + wal.shutdown(); + + // Let us try to split and recover + runWALSplit(this.conf); + WAL wal2 = createWAL(this.conf); + Mockito.doReturn(false).when(rsServices).isAborted(); + HRegion region2 = + HRegion.openHRegion(this.hbaseRootDir, hri, htd, wal2, this.conf, rsServices, null); + scanner = region2.getScanner(new Scan()); + assertEquals(writtenRowCount, getScannedCount(scanner)); + } + + private int getScannedCount(RegionScanner scanner) throws IOException { + int scannedCount = 0; + List results = new ArrayList(); + while (true) { + boolean existMore = scanner.next(results); + if (!results.isEmpty()) + scannedCount++; + if (!existMore) + break; + results.clear(); + } + return scannedCount; + } + + /** + * Create an HRegion with the result of a WAL split and test we only see the + * good edits + * @throws Exception + */ + @Test + public void testReplayEditsWrittenIntoWAL() throws Exception { + final TableName tableName = + TableName.valueOf("testReplayEditsWrittenIntoWAL"); + final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); + final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); + final Path basedir = FSUtils.getTableDir(hbaseRootDir, tableName); + deleteDir(basedir); + + final HTableDescriptor htd = createBasic3FamilyHTD(tableName); + HRegion region2 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); + HBaseTestingUtility.closeRegionAndWAL(region2); + final WAL wal = createWAL(this.conf); + final byte[] rowName = tableName.getName(); + final byte[] regionName = hri.getEncodedNameAsBytes(); + + // Add 1k to each family. + final int countPerFamily = 1000; + Set familyNames = new HashSet(); + NavigableMap scopes = new TreeMap( + Bytes.BYTES_COMPARATOR); + for(byte[] fam : htd.getFamiliesKeys()) { + scopes.put(fam, 0); + } + for (HColumnDescriptor hcd: htd.getFamilies()) { + addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, + ee, wal, htd, mvcc, scopes); + familyNames.add(hcd.getName()); + } + + // Add a cache flush, shouldn't have any effect + wal.startCacheFlush(regionName, familyNames); + wal.completeCacheFlush(regionName); + + // Add an edit to another family, should be skipped. + WALEdit edit = new WALEdit(); + long now = ee.currentTime(); + edit.add(new KeyValue(rowName, Bytes.toBytes("another family"), rowName, + now, rowName)); + wal.append(hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), edit, + true); + + // Delete the c family to verify deletes make it over. + edit = new WALEdit(); + now = ee.currentTime(); + edit.add(new KeyValue(rowName, Bytes.toBytes("c"), null, now, KeyValue.Type.DeleteFamily)); + wal.append(hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), edit, + true); + + // Sync. + wal.sync(); + // Make a new conf and a new fs for the splitter to run on so we can take + // over old wal. + final Configuration newConf = HBaseConfiguration.create(this.conf); + User user = HBaseTestingUtility.getDifferentUser(newConf, + ".replay.wal.secondtime"); + user.runAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + runWALSplit(newConf); + FileSystem newFS = FileSystem.get(newConf); + // 100k seems to make for about 4 flushes during HRegion#initialize. + newConf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 100); + // Make a new wal for new region. + WAL newWal = createWAL(newConf); + final AtomicInteger flushcount = new AtomicInteger(0); + try { + final HRegion region = + new HRegion(basedir, newWal, newFS, newConf, hri, htd, null) { + @Override + protected FlushResult internalFlushcache(final WAL wal, final long myseqid, + final Collection storesToFlush, MonitoredTask status, + boolean writeFlushWalMarker) + throws IOException { + LOG.info("InternalFlushCache Invoked"); + FlushResult fs = super.internalFlushcache(wal, myseqid, storesToFlush, + Mockito.mock(MonitoredTask.class), writeFlushWalMarker); + flushcount.incrementAndGet(); + return fs; + } + }; + // The seq id this region has opened up with + long seqid = region.initialize(); + + // The mvcc readpoint of from inserting data. + long writePoint = mvcc.getWritePoint(); + + // We flushed during init. + assertTrue("Flushcount=" + flushcount.get(), flushcount.get() > 0); + assertTrue((seqid - 1) == writePoint); + + Get get = new Get(rowName); + Result result = region.get(get); + // Make sure we only see the good edits + assertEquals(countPerFamily * (htd.getFamilies().size() - 1), + result.size()); + region.close(); + } finally { + newWal.close(); + } + return null; + } + }); + } + + @Test + // the following test is for HBASE-6065 + public void testSequentialEditLogSeqNum() throws IOException { + final TableName tableName = TableName.valueOf(currentTest.getMethodName()); + final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); + final Path basedir = + FSUtils.getTableDir(this.hbaseRootDir, tableName); + deleteDir(basedir); + final byte[] rowName = tableName.getName(); + final int countPerFamily = 10; + final HTableDescriptor htd = createBasic1FamilyHTD(tableName); + + // Mock the WAL + MockWAL wal = createMockWAL(); + + HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal); + for (HColumnDescriptor hcd : htd.getFamilies()) { + addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x"); + } + + // Let us flush the region + // But this time completeflushcache is not yet done + region.flush(true); + for (HColumnDescriptor hcd : htd.getFamilies()) { + addRegionEdits(rowName, hcd.getName(), 5, this.ee, region, "x"); + } + long lastestSeqNumber = region.getReadPoint(null); + // get the current seq no + wal.doCompleteCacheFlush = true; + // allow complete cache flush with the previous seq number got after first + // set of edits. + wal.completeCacheFlush(hri.getEncodedNameAsBytes()); + wal.shutdown(); + FileStatus[] listStatus = wal.getFiles(); + assertNotNull(listStatus); + assertTrue(listStatus.length > 0); + WALSplitter.splitLogFile(hbaseRootDir, listStatus[0], + this.fs, this.conf, null, null, null, mode, wals); + FileStatus[] listStatus1 = this.fs.listStatus( + new Path(FSUtils.getTableDir(hbaseRootDir, tableName), new Path(hri.getEncodedName(), + "recovered.edits")), new PathFilter() { + @Override + public boolean accept(Path p) { + if (WALSplitter.isSequenceIdFile(p)) { + return false; + } + return true; + } + }); + int editCount = 0; + for (FileStatus fileStatus : listStatus1) { + editCount = Integer.parseInt(fileStatus.getPath().getName()); + } + // The sequence number should be same + assertEquals( + "The sequence number of the recoverd.edits and the current edit seq should be same", + lastestSeqNumber, editCount); + } + + /** + * testcase for https://issues.apache.org/jira/browse/HBASE-15252 + */ + @Test + public void testDatalossWhenInputError() throws IOException, InstantiationException, + IllegalAccessException { + final TableName tableName = TableName.valueOf("testDatalossWhenInputError"); + final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); + final Path basedir = FSUtils.getTableDir(this.hbaseRootDir, tableName); + deleteDir(basedir); + final byte[] rowName = tableName.getName(); + final int countPerFamily = 10; + final HTableDescriptor htd = createBasic1FamilyHTD(tableName); + HRegion region1 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); + Path regionDir = region1.getRegionFileSystem().getRegionDir(); + HBaseTestingUtility.closeRegionAndWAL(region1); + + WAL wal = createWAL(this.conf); + HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal); + for (HColumnDescriptor hcd : htd.getFamilies()) { + addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x"); + } + // Now assert edits made it in. + final Get g = new Get(rowName); + Result result = region.get(g); + assertEquals(countPerFamily * htd.getFamilies().size(), result.size()); + // Now close the region (without flush), split the log, reopen the region and assert that + // replay of log has the correct effect. + region.close(true); + wal.shutdown(); + + runWALSplit(this.conf); + + // here we let the DFSInputStream throw an IOException just after the WALHeader. + Path editFile = WALSplitter.getSplitEditFilesSorted(this.fs, regionDir).first(); + FSDataInputStream stream = fs.open(editFile); + stream.seek(ProtobufLogReader.PB_WAL_MAGIC.length); + Class logReaderClass = + conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class, + DefaultWALProvider.Reader.class); + DefaultWALProvider.Reader reader = logReaderClass.newInstance(); + reader.init(this.fs, editFile, conf, stream); + final long headerLength = stream.getPos(); + reader.close(); + FileSystem spyFs = spy(this.fs); + doAnswer(new Answer() { + + @Override + public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable { + FSDataInputStream stream = (FSDataInputStream) invocation.callRealMethod(); + Field field = FilterInputStream.class.getDeclaredField("in"); + field.setAccessible(true); + final DFSInputStream in = (DFSInputStream) field.get(stream); + DFSInputStream spyIn = spy(in); + doAnswer(new Answer() { + + private long pos; + + @Override + public Integer answer(InvocationOnMock invocation) throws Throwable { + if (pos >= headerLength) { + throw new IOException("read over limit"); + } + int b = (Integer) invocation.callRealMethod(); + if (b > 0) { + pos += b; + } + return b; + } + }).when(spyIn).read(any(byte[].class), any(int.class), any(int.class)); + doAnswer(new Answer() { + + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + invocation.callRealMethod(); + in.close(); + return null; + } + }).when(spyIn).close(); + field.set(stream, spyIn); + return stream; + } + }).when(spyFs).open(eq(editFile)); + + WAL wal2 = createWAL(this.conf); + HRegion region2; + try { + // log replay should fail due to the IOException, otherwise we may lose data. + region2 = HRegion.openHRegion(conf, spyFs, hbaseRootDir, hri, htd, wal2); + assertEquals(result.size(), region2.get(g).size()); + } catch (IOException e) { + assertEquals("read over limit", e.getMessage()); + } + region2 = HRegion.openHRegion(conf, fs, hbaseRootDir, hri, htd, wal2); + assertEquals(result.size(), region2.get(g).size()); + } + + /** + * testcase for https://issues.apache.org/jira/browse/HBASE-14949. + */ + private void testNameConflictWhenSplit(boolean largeFirst) throws IOException { + final TableName tableName = TableName.valueOf("testReplayEditsWrittenIntoWAL"); + final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); + final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); + final Path basedir = FSUtils.getTableDir(hbaseRootDir, tableName); + deleteDir(basedir); + + final HTableDescriptor htd = createBasic1FamilyHTD(tableName); + NavigableMap scopes = new TreeMap(Bytes.BYTES_COMPARATOR); + for (byte[] fam : htd.getFamiliesKeys()) { + scopes.put(fam, 0); + } + HRegion region = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); + HBaseTestingUtility.closeRegionAndWAL(region); + final byte[] family = htd.getColumnFamilies()[0].getName(); + final byte[] rowName = tableName.getName(); + FSWALEntry entry1 = createFSWALEntry(htd, hri, 1L, rowName, family, ee, mvcc, 1, scopes); + FSWALEntry entry2 = createFSWALEntry(htd, hri, 2L, rowName, family, ee, mvcc, 2, scopes); + + Path largeFile = new Path(logDir, "wal-1"); + Path smallFile = new Path(logDir, "wal-2"); + writerWALFile(largeFile, Arrays.asList(entry1, entry2)); + writerWALFile(smallFile, Arrays.asList(entry2)); + FileStatus first, second; + if (largeFirst) { + first = fs.getFileStatus(largeFile); + second = fs.getFileStatus(smallFile); + } else { + first = fs.getFileStatus(smallFile); + second = fs.getFileStatus(largeFile); + } + WALSplitter.splitLogFile(hbaseRootDir, first, fs, conf, null, null, null, + RecoveryMode.LOG_SPLITTING, wals); + WALSplitter.splitLogFile(hbaseRootDir, second, fs, conf, null, null, null, + RecoveryMode.LOG_SPLITTING, wals); + WAL wal = createWAL(this.conf); + region = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal); + assertTrue(region.getOpenSeqNum() > mvcc.getWritePoint()); + assertEquals(2, region.get(new Get(rowName)).size()); + } + + @Test + public void testNameConflictWhenSplit0() throws IOException { + testNameConflictWhenSplit(true); + } + + @Test + public void testNameConflictWhenSplit1() throws IOException { + testNameConflictWhenSplit(false); + } + + static class MockWAL extends FSHLog { + boolean doCompleteCacheFlush = false; + + public MockWAL(FileSystem fs, Path rootDir, String logName, Configuration conf) + throws IOException { + super(fs, rootDir, logName, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null); + } + + @Override + public void completeCacheFlush(byte[] encodedRegionName) { + if (!doCompleteCacheFlush) { + return; + } + super.completeCacheFlush(encodedRegionName); + } + } + + private HTableDescriptor createBasic1FamilyHTD(final TableName tableName) { + HTableDescriptor htd = new HTableDescriptor(tableName); + HColumnDescriptor a = new HColumnDescriptor(Bytes.toBytes("a")); + htd.addFamily(a); + return htd; + } + + private MockWAL createMockWAL() throws IOException { + MockWAL wal = new MockWAL(fs, hbaseRootDir, logName, conf); + // Set down maximum recovery so we dfsclient doesn't linger retrying something + // long gone. + HBaseTestingUtility.setMaxRecoveryErrorCount(wal.getOutputStream(), 1); + return wal; + } + + // Flusher used in this test. Keep count of how often we are called and + // actually run the flush inside here. + class TestFlusher implements FlushRequester { + private HRegion r; + + @Override + public void requestFlush(Region region, boolean force) { + try { + r.flush(force); + } catch (IOException e) { + throw new RuntimeException("Exception flushing", e); + } + } + + @Override + public void requestDelayedFlush(Region region, long when, boolean forceFlushAllStores) { + // TODO Auto-generated method stub + + } + + @Override + public void registerFlushRequestListener(FlushRequestListener listener) { + + } + + @Override + public boolean unregisterFlushRequestListener(FlushRequestListener listener) { + return false; + } + + @Override + public void setGlobalMemstoreLimit(long globalMemStoreSize) { + + } + } + + private WALKey createWALKey(final TableName tableName, final HRegionInfo hri, + final MultiVersionConcurrencyControl mvcc, NavigableMap scopes) { + return new WALKey(hri.getEncodedNameAsBytes(), tableName, 999, mvcc, scopes); + } + + private WALEdit createWALEdit(final byte[] rowName, final byte[] family, EnvironmentEdge ee, + int index) { + byte[] qualifierBytes = Bytes.toBytes(Integer.toString(index)); + byte[] columnBytes = Bytes.toBytes(Bytes.toString(family) + ":" + Integer.toString(index)); + WALEdit edit = new WALEdit(); + edit.add(new KeyValue(rowName, family, qualifierBytes, ee.currentTime(), columnBytes)); + return edit; + } + + private FSWALEntry createFSWALEntry(HTableDescriptor htd, HRegionInfo hri, long sequence, + byte[] rowName, byte[] family, EnvironmentEdge ee, MultiVersionConcurrencyControl mvcc, + int index, NavigableMap scopes) throws IOException { + FSWALEntry entry = + new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc, scopes), createWALEdit( + rowName, family, ee, index), hri, true); + entry.stampRegionSequenceId(); + return entry; + } + + private void addWALEdits(final TableName tableName, final HRegionInfo hri, final byte[] rowName, + final byte[] family, final int count, EnvironmentEdge ee, final WAL wal, + final HTableDescriptor htd, final MultiVersionConcurrencyControl mvcc, + NavigableMap scopes) throws IOException { + for (int j = 0; j < count; j++) { + wal.append(hri, createWALKey(tableName, hri, mvcc, scopes), + createWALEdit(rowName, family, ee, j), true); + } + wal.sync(); + } + + static List addRegionEdits(final byte[] rowName, final byte[] family, final int count, + EnvironmentEdge ee, final Region r, final String qualifierPrefix) throws IOException { + List puts = new ArrayList(); + for (int j = 0; j < count; j++) { + byte[] qualifier = Bytes.toBytes(qualifierPrefix + Integer.toString(j)); + Put p = new Put(rowName); + p.addColumn(family, qualifier, ee.currentTime(), rowName); + r.put(p); + puts.add(p); + } + return puts; + } + + /* + * Creates an HRI around an HTD that has tableName and three + * column families named 'a','b', and 'c'. + * @param tableName Name of table to use when we create HTableDescriptor. + */ + private HRegionInfo createBasic3FamilyHRegionInfo(final TableName tableName) { + return new HRegionInfo(tableName, null, null, false); + } + + /* + * Run the split. Verify only single split file made. + * @param c + * @return The single split file made + * @throws IOException + */ + private Path runWALSplit(final Configuration c) throws IOException { + List splits = WALSplitter.split( + hbaseRootDir, logDir, oldLogDir, FileSystem.get(c), c, wals); + // Split should generate only 1 file since there's only 1 region + assertEquals("splits=" + splits, 1, splits.size()); + // Make sure the file exists + assertTrue(fs.exists(splits.get(0))); + LOG.info("Split file=" + splits.get(0)); + return splits.get(0); + } + + /* + * @param c + * @return WAL with retries set down from 5 to 1 only. + * @throws IOException + */ + private WAL createWAL(final Configuration c) throws IOException { + FSHLog wal = new FSHLog(FileSystem.get(c), hbaseRootDir, logName, c); + // Set down maximum recovery so we dfsclient doesn't linger retrying something + // long gone. + HBaseTestingUtility.setMaxRecoveryErrorCount(wal.getOutputStream(), 1); + return wal; + } + + private HTableDescriptor createBasic3FamilyHTD(final TableName tableName) { + HTableDescriptor htd = new HTableDescriptor(tableName); + HColumnDescriptor a = new HColumnDescriptor(Bytes.toBytes("a")); + htd.addFamily(a); + HColumnDescriptor b = new HColumnDescriptor(Bytes.toBytes("b")); + htd.addFamily(b); + HColumnDescriptor c = new HColumnDescriptor(Bytes.toBytes("c")); + htd.addFamily(c); + return htd; + } + + private void writerWALFile(Path file, List entries) throws IOException { + fs.mkdirs(file.getParent()); + ProtobufLogWriter writer = new ProtobufLogWriter(); + writer.init(fs, file, conf, true); + for (FSWALEntry entry : entries) { + writer.append(entry); + } + writer.sync(); + writer.close(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRollPeriod.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRollPeriod.java index bedb915..af022fc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRollPeriod.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRollPeriod.java @@ -25,12 +25,12 @@ import org.junit.BeforeClass; import org.junit.experimental.categories.Category; @Category({ RegionServerTests.class, MediumTests.class }) -public class TestAsyncLogRollPeriod extends TestLogRollPeriod { +public class TestAsyncLogRollPeriod extends AbstractTestLogRollPeriod { @BeforeClass public static void setUpBeforeClass() throws Exception { - Configuration conf = TestLogRollPeriod.TEST_UTIL.getConfiguration(); + Configuration conf = AbstractTestLogRollPeriod.TEST_UTIL.getConfiguration(); conf.set(WALFactory.WAL_PROVIDER, "asyncfs"); - TestLogRollPeriod.setUpBeforeClass(); + AbstractTestLogRollPeriod.setUpBeforeClass(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplay.java index ca415fd..3938ecd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplay.java @@ -25,12 +25,12 @@ import org.junit.BeforeClass; import org.junit.experimental.categories.Category; @Category({ RegionServerTests.class, MediumTests.class }) -public class TestAsyncWALReplay extends TestWALReplay { +public class TestAsyncWALReplay extends AbstractTestWALReplay { @BeforeClass public static void setUpBeforeClass() throws Exception { - Configuration conf = TestWALReplay.TEST_UTIL.getConfiguration(); + Configuration conf = AbstractTestWALReplay.TEST_UTIL.getConfiguration(); conf.set(WALFactory.WAL_PROVIDER, "asyncfs"); - TestWALReplay.setUpBeforeClass(); + AbstractTestWALReplay.setUpBeforeClass(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplayCompressed.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplayCompressed.java index 3b8869b..16358a3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplayCompressed.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplayCompressed.java @@ -26,13 +26,13 @@ import org.junit.BeforeClass; import org.junit.experimental.categories.Category; @Category({ RegionServerTests.class, MediumTests.class }) -public class TestAsyncWALReplayCompressed extends TestWALReplay { +public class TestAsyncWALReplayCompressed extends AbstractTestWALReplay { @BeforeClass public static void setUpBeforeClass() throws Exception { - Configuration conf = TestWALReplay.TEST_UTIL.getConfiguration(); + Configuration conf = AbstractTestWALReplay.TEST_UTIL.getConfiguration(); conf.set(WALFactory.WAL_PROVIDER, "asyncfs"); conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true); - TestWALReplay.setUpBeforeClass(); + AbstractTestWALReplay.setUpBeforeClass(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java index b7c1c73..b4baa74 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java @@ -96,6 +96,7 @@ public class TestLogRollAbort { // the namenode might still try to choose the recently-dead datanode // for a pipeline, so try to a new pipeline multiple times TEST_UTIL.getConfiguration().setInt("dfs.client.block.write.retries", 10); + TEST_UTIL.getConfiguration().set(WALFactory.WAL_PROVIDER, "filesystem"); } private Configuration conf; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollPeriod.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollPeriod.java index 1141871..2449942 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollPeriod.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollPeriod.java @@ -17,144 +17,20 @@ */ package org.apache.hadoop.hbase.regionserver.wal; -import static org.junit.Assert.assertFalse; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.wal.WAL; -import org.junit.AfterClass; +import org.apache.hadoop.hbase.wal.WALFactory; import org.junit.BeforeClass; -import org.junit.Test; import org.junit.experimental.categories.Category; -/** - * Tests that verifies that the log is forced to be rolled every "hbase.regionserver.logroll.period" - */ -@Category({RegionServerTests.class, MediumTests.class}) -public class TestLogRollPeriod { - private static final Log LOG = LogFactory.getLog(AbstractTestLogRolling.class); - - protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - - private final static long LOG_ROLL_PERIOD = 4000; +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestLogRollPeriod extends AbstractTestLogRollPeriod { @BeforeClass public static void setUpBeforeClass() throws Exception { - // disable the ui - TEST_UTIL.getConfiguration().setInt("hbase.regionsever.info.port", -1); - - TEST_UTIL.getConfiguration().setLong("hbase.regionserver.logroll.period", LOG_ROLL_PERIOD); - - TEST_UTIL.startMiniCluster(); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - TEST_UTIL.shutdownMiniCluster(); - } - - /** - * Tests that the LogRoller perform the roll even if there are no edits - */ - @Test - public void testNoEdits() throws Exception { - TableName tableName = TableName.valueOf("TestLogRollPeriodNoEdits"); - TEST_UTIL.createTable(tableName, "cf"); - try { - Table table = TEST_UTIL.getConnection().getTable(tableName); - try { - HRegionServer server = TEST_UTIL.getRSForFirstRegionInTable(tableName); - WAL log = server.getWAL(null); - checkMinLogRolls(log, 5); - } finally { - table.close(); - } - } finally { - TEST_UTIL.deleteTable(tableName); - } - } - - /** - * Tests that the LogRoller perform the roll with some data in the log - */ - @Test(timeout=60000) - public void testWithEdits() throws Exception { - final TableName tableName = TableName.valueOf("TestLogRollPeriodWithEdits"); - final String family = "cf"; - - TEST_UTIL.createTable(tableName, family); - try { - HRegionServer server = TEST_UTIL.getRSForFirstRegionInTable(tableName); - WAL log = server.getWAL(null); - final Table table = TEST_UTIL.getConnection().getTable(tableName); - - Thread writerThread = new Thread("writer") { - @Override - public void run() { - try { - long row = 0; - while (!interrupted()) { - Put p = new Put(Bytes.toBytes(String.format("row%d", row))); - p.addColumn(Bytes.toBytes(family), Bytes.toBytes("col"), Bytes.toBytes(row)); - table.put(p); - row++; - - Thread.sleep(LOG_ROLL_PERIOD / 16); - } - } catch (Exception e) { - LOG.warn(e); - } - } - }; - - try { - writerThread.start(); - checkMinLogRolls(log, 5); - } finally { - writerThread.interrupt(); - writerThread.join(); - table.close(); - } - } finally { - TEST_UTIL.deleteTable(tableName); - } - } - - private void checkMinLogRolls(final WAL log, final int minRolls) - throws Exception { - final List paths = new ArrayList(); - log.registerWALActionsListener(new WALActionsListener.Base() { - @Override - public void postLogRoll(Path oldFile, Path newFile) { - LOG.debug("postLogRoll: oldFile="+oldFile+" newFile="+newFile); - paths.add(newFile); - } - }); - - // Sleep until we should get at least min-LogRoll events - long wtime = System.currentTimeMillis(); - Thread.sleep((minRolls + 1) * LOG_ROLL_PERIOD); - // Do some extra sleep in case the machine is slow, - // and the log-roll is not triggered exactly on LOG_ROLL_PERIOD. - final int NUM_RETRIES = 1 + 8 * (minRolls - paths.size()); - for (int retry = 0; paths.size() < minRolls && retry < NUM_RETRIES; ++retry) { - Thread.sleep(LOG_ROLL_PERIOD / 4); - } - wtime = System.currentTimeMillis() - wtime; - LOG.info(String.format("got %d rolls after %dms (%dms each) - expected at least %d rolls", - paths.size(), wtime, wtime / paths.size(), minRolls)); - assertFalse(paths.size() < minRolls); + Configuration conf = AbstractTestLogRollPeriod.TEST_UTIL.getConfiguration(); + conf.set(WALFactory.WAL_PROVIDER, "filesystem"); + AbstractTestLogRollPeriod.setUpBeforeClass(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java index 37b23e0..931b79d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java @@ -82,6 +82,7 @@ public class TestLogRolling extends AbstractTestLogRolling { TEST_UTIL.getConfiguration().setInt("dfs.client.block.write.retries", 30); TEST_UTIL.getConfiguration().setInt("hbase.regionserver.hlog.tolerable.lowreplication", 2); TEST_UTIL.getConfiguration().setInt("hbase.regionserver.hlog.lowreplication.rolllimit", 3); + TEST_UTIL.getConfiguration().set(WALFactory.WAL_PROVIDER, "filesystem"); AbstractTestLogRolling.setUpBeforeClass(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java index 9ab7b7d..eda7df7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java @@ -70,6 +70,7 @@ public class TestLogRollingNoCluster { // The implementation needs to know the 'handler' count. TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, THREAD_COUNT); final Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + conf.set(WALFactory.WAL_PROVIDER, "filesystem"); FSUtils.setRootDir(conf, dir); final WALFactory wals = new WALFactory(conf, null, TestLogRollingNoCluster.class.getName()); final WAL wal = wals.getWAL(new byte[]{}, null); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSecureAsyncWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSecureAsyncWALReplay.java index 73c0216..5b8b404 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSecureAsyncWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSecureAsyncWALReplay.java @@ -32,7 +32,7 @@ public class TestSecureAsyncWALReplay extends TestAsyncWALReplay { @BeforeClass public static void setUpBeforeClass() throws Exception { - Configuration conf = TestWALReplay.TEST_UTIL.getConfiguration(); + Configuration conf = AbstractTestWALReplay.TEST_UTIL.getConfiguration(); conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName()); conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase"); conf.setClass("hbase.regionserver.hlog.reader.impl", SecureProtobufLogReader.class, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSecureWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSecureWALReplay.java index be5d951..91172ab 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSecureWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSecureWALReplay.java @@ -19,21 +19,20 @@ package org.apache.hadoop.hbase.regionserver.wal; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; -import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting; import org.apache.hadoop.hbase.wal.WAL.Reader; import org.apache.hadoop.hbase.wal.WALProvider.Writer; - import org.junit.BeforeClass; import org.junit.experimental.categories.Category; -@Category({RegionServerTests.class, MediumTests.class}) +@Category({ RegionServerTests.class, MediumTests.class }) public class TestSecureWALReplay extends TestWALReplay { @BeforeClass public static void setUpBeforeClass() throws Exception { - Configuration conf = TestWALReplay.TEST_UTIL.getConfiguration(); + Configuration conf = AbstractTestWALReplay.TEST_UTIL.getConfiguration(); conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName()); conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase"); conf.setClass("hbase.regionserver.hlog.reader.impl", SecureProtobufLogReader.class, @@ -41,7 +40,6 @@ public class TestSecureWALReplay extends TestWALReplay { conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class, Writer.class); conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true); - TestWALReplay.setUpBeforeClass(); + AbstractTestWALReplay.setUpBeforeClass(); } - } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java index 3e894d7..78a9184 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java @@ -1,5 +1,4 @@ /** - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,1265 +17,20 @@ */ package org.apache.hadoop.hbase.regionserver.wal; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.when; - -import java.io.FilterInputStream; -import java.io.IOException; -import java.lang.reflect.Field; -import java.security.PrivilegedExceptionAction; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.NavigableMap; -import java.util.Set; -import java.util.TreeMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.MasterNotRunningException; -import org.apache.hadoop.hbase.MiniHBaseCluster; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.ZooKeeperConnectionException; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.master.HMaster; -import org.apache.hadoop.hbase.monitoring.MonitoredTask; -import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; -import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; -import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher; -import org.apache.hadoop.hbase.regionserver.FlushRequestListener; -import org.apache.hadoop.hbase.regionserver.FlushRequester; -import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot; -import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; -import org.apache.hadoop.hbase.regionserver.Region; -import org.apache.hadoop.hbase.regionserver.RegionScanner; -import org.apache.hadoop.hbase.regionserver.RegionServerServices; -import org.apache.hadoop.hbase.regionserver.Store; -import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; -import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.EnvironmentEdge; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.util.HFileTestUtil; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.wal.DefaultWALProvider; -import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALFactory; -import org.apache.hadoop.hbase.wal.WALKey; -import org.apache.hadoop.hbase.wal.WALSplitter; -import org.apache.hadoop.hdfs.DFSInputStream; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -/** - * Test replay of edits out of a WAL split. - */ -@Category({RegionServerTests.class, MediumTests.class}) -public class TestWALReplay { - private static final Log LOG = LogFactory.getLog(TestWALReplay.class); - static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private final EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate(); - private Path hbaseRootDir = null; - private String logName; - private Path oldLogDir; - private Path logDir; - private FileSystem fs; - private Configuration conf; - private RecoveryMode mode; - private WALFactory wals; - - @Rule - public final TestName currentTest = new TestName(); +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestWALReplay extends AbstractTestWALReplay { @BeforeClass public static void setUpBeforeClass() throws Exception { - Configuration conf = TEST_UTIL.getConfiguration(); - conf.setBoolean("dfs.support.append", true); - // The below config supported by 0.20-append and CDH3b2 - conf.setInt("dfs.client.block.recovery.retries", 2); - TEST_UTIL.startMiniCluster(3); - Path hbaseRootDir = - TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase")); - LOG.info("hbase.rootdir=" + hbaseRootDir); - FSUtils.setRootDir(conf, hbaseRootDir); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - TEST_UTIL.shutdownMiniCluster(); - } - - @Before - public void setUp() throws Exception { - this.conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); - this.fs = TEST_UTIL.getDFSCluster().getFileSystem(); - this.hbaseRootDir = FSUtils.getRootDir(this.conf); - this.oldLogDir = new Path(this.hbaseRootDir, HConstants.HREGION_OLDLOGDIR_NAME); - this.logName = DefaultWALProvider.getWALDirectoryName(currentTest.getMethodName() + "-manual"); - this.logDir = new Path(this.hbaseRootDir, logName); - if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) { - TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true); - } - this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? - RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING); - this.wals = new WALFactory(conf, null, currentTest.getMethodName()); - } - - @After - public void tearDown() throws Exception { - this.wals.close(); - TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true); - } - - /* - * @param p Directory to cleanup - */ - private void deleteDir(final Path p) throws IOException { - if (this.fs.exists(p)) { - if (!this.fs.delete(p, true)) { - throw new IOException("Failed remove of " + p); - } - } - } - - /** - * - * @throws Exception - */ - @Test - public void testReplayEditsAfterRegionMovedWithMultiCF() throws Exception { - final TableName tableName = - TableName.valueOf("testReplayEditsAfterRegionMovedWithMultiCF"); - byte[] family1 = Bytes.toBytes("cf1"); - byte[] family2 = Bytes.toBytes("cf2"); - byte[] qualifier = Bytes.toBytes("q"); - byte[] value = Bytes.toBytes("testV"); - byte[][] familys = { family1, family2 }; - TEST_UTIL.createTable(tableName, familys); - Table htable = TEST_UTIL.getConnection().getTable(tableName); - Put put = new Put(Bytes.toBytes("r1")); - put.addColumn(family1, qualifier, value); - htable.put(put); - ResultScanner resultScanner = htable.getScanner(new Scan()); - int count = 0; - while (resultScanner.next() != null) { - count++; - } - resultScanner.close(); - assertEquals(1, count); - - MiniHBaseCluster hbaseCluster = TEST_UTIL.getMiniHBaseCluster(); - List regions = hbaseCluster.getRegions(tableName); - assertEquals(1, regions.size()); - - // move region to another regionserver - Region destRegion = regions.get(0); - int originServerNum = hbaseCluster - .getServerWith(destRegion.getRegionInfo().getRegionName()); - assertTrue("Please start more than 1 regionserver", hbaseCluster - .getRegionServerThreads().size() > 1); - int destServerNum = 0; - while (destServerNum == originServerNum) { - destServerNum++; - } - HRegionServer originServer = hbaseCluster.getRegionServer(originServerNum); - HRegionServer destServer = hbaseCluster.getRegionServer(destServerNum); - // move region to destination regionserver - moveRegionAndWait(destRegion, destServer); - - // delete the row - Delete del = new Delete(Bytes.toBytes("r1")); - htable.delete(del); - resultScanner = htable.getScanner(new Scan()); - count = 0; - while (resultScanner.next() != null) { - count++; - } - resultScanner.close(); - assertEquals(0, count); - - // flush region and make major compaction - Region region = destServer.getOnlineRegion(destRegion.getRegionInfo().getRegionName()); - region.flush(true); - // wait to complete major compaction - for (Store store : region.getStores()) { - store.triggerMajorCompaction(); - } - region.compact(true); - - // move region to origin regionserver - moveRegionAndWait(destRegion, originServer); - // abort the origin regionserver - originServer.abort("testing"); - - // see what we get - Result result = htable.get(new Get(Bytes.toBytes("r1"))); - if (result != null) { - assertTrue("Row is deleted, but we get" + result.toString(), - (result == null) || result.isEmpty()); - } - resultScanner.close(); - } - - private void moveRegionAndWait(Region destRegion, HRegionServer destServer) - throws InterruptedException, MasterNotRunningException, - ZooKeeperConnectionException, IOException { - HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster(); - TEST_UTIL.getHBaseAdmin().move( - destRegion.getRegionInfo().getEncodedNameAsBytes(), - Bytes.toBytes(destServer.getServerName().getServerName())); - while (true) { - ServerName serverName = master.getAssignmentManager() - .getRegionStates().getRegionServerOfRegion(destRegion.getRegionInfo()); - if (serverName != null && serverName.equals(destServer.getServerName())) { - TEST_UTIL.assertRegionOnServer( - destRegion.getRegionInfo(), serverName, 200); - break; - } - Thread.sleep(10); - } - } - - /** - * Tests for hbase-2727. - * @throws Exception - * @see HBASE-2727 - */ - @Test - public void test2727() throws Exception { - // Test being able to have > 1 set of edits in the recovered.edits directory. - // Ensure edits are replayed properly. - final TableName tableName = - TableName.valueOf("test2727"); - - MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); - HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); - Path basedir = FSUtils.getTableDir(hbaseRootDir, tableName); - deleteDir(basedir); - - HTableDescriptor htd = createBasic3FamilyHTD(tableName); - Region region2 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); - HBaseTestingUtility.closeRegionAndWAL(region2); - final byte [] rowName = tableName.getName(); - - WAL wal1 = createWAL(this.conf); - // Add 1k to each family. - final int countPerFamily = 1000; - - NavigableMap scopes = new TreeMap( - Bytes.BYTES_COMPARATOR); - for(byte[] fam : htd.getFamiliesKeys()) { - scopes.put(fam, 0); - } - for (HColumnDescriptor hcd: htd.getFamilies()) { - addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, ee, - wal1, htd, mvcc, scopes); - } - wal1.shutdown(); - runWALSplit(this.conf); - - WAL wal2 = createWAL(this.conf); - // Add 1k to each family. - for (HColumnDescriptor hcd: htd.getFamilies()) { - addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, - ee, wal2, htd, mvcc, scopes); - } - wal2.shutdown(); - runWALSplit(this.conf); - - WAL wal3 = createWAL(this.conf); - try { - HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal3); - long seqid = region.getOpenSeqNum(); - // The regions opens with sequenceId as 1. With 6k edits, its sequence number reaches 6k + 1. - // When opened, this region would apply 6k edits, and increment the sequenceId by 1 - assertTrue(seqid > mvcc.getWritePoint()); - assertEquals(seqid - 1, mvcc.getWritePoint()); - LOG.debug("region.getOpenSeqNum(): " + region.getOpenSeqNum() + ", wal3.id: " - + mvcc.getReadPoint()); - - // TODO: Scan all. - region.close(); - } finally { - wal3.close(); - } - } - - /** - * Test case of HRegion that is only made out of bulk loaded files. Assert - * that we don't 'crash'. - * @throws IOException - * @throws IllegalAccessException - * @throws NoSuchFieldException - * @throws IllegalArgumentException - * @throws SecurityException - */ - @Test - public void testRegionMadeOfBulkLoadedFilesOnly() - throws IOException, SecurityException, IllegalArgumentException, - NoSuchFieldException, IllegalAccessException, InterruptedException { - final TableName tableName = - TableName.valueOf("testRegionMadeOfBulkLoadedFilesOnly"); - final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); - final Path basedir = new Path(this.hbaseRootDir, tableName.getNameAsString()); - deleteDir(basedir); - final HTableDescriptor htd = createBasic3FamilyHTD(tableName); - Region region2 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); - HBaseTestingUtility.closeRegionAndWAL(region2); - WAL wal = createWAL(this.conf); - Region region = HRegion.openHRegion(hri, htd, wal, this.conf); - - byte [] family = htd.getFamilies().iterator().next().getName(); - Path f = new Path(basedir, "hfile"); - HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(""), - Bytes.toBytes("z"), 10); - List > hfs= new ArrayList>(1); - hfs.add(Pair.newPair(family, f.toString())); - region.bulkLoadHFiles(hfs, true, null); - - // Add an edit so something in the WAL - byte [] row = tableName.getName(); - region.put((new Put(row)).addColumn(family, family, family)); - wal.sync(); - final int rowsInsertedCount = 11; - - assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan()))); - - // Now 'crash' the region by stealing its wal - final Configuration newConf = HBaseConfiguration.create(this.conf); - User user = HBaseTestingUtility.getDifferentUser(newConf, - tableName.getNameAsString()); - user.runAs(new PrivilegedExceptionAction() { - @Override - public Object run() throws Exception { - runWALSplit(newConf); - WAL wal2 = createWAL(newConf); - - HRegion region2 = HRegion.openHRegion(newConf, FileSystem.get(newConf), - hbaseRootDir, hri, htd, wal2); - long seqid2 = region2.getOpenSeqNum(); - assertTrue(seqid2 > -1); - assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan()))); - - // I can't close wal1. Its been appropriated when we split. - region2.close(); - wal2.close(); - return null; - } - }); - } - - /** - * HRegion test case that is made of a major compacted HFile (created with three bulk loaded - * files) and an edit in the memstore. - * This is for HBASE-10958 "[dataloss] Bulk loading with seqids can prevent some log entries - * from being replayed" - * @throws IOException - * @throws IllegalAccessException - * @throws NoSuchFieldException - * @throws IllegalArgumentException - * @throws SecurityException - */ - @Test - public void testCompactedBulkLoadedFiles() - throws IOException, SecurityException, IllegalArgumentException, - NoSuchFieldException, IllegalAccessException, InterruptedException { - final TableName tableName = - TableName.valueOf("testCompactedBulkLoadedFiles"); - final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); - final Path basedir = new Path(this.hbaseRootDir, tableName.getNameAsString()); - deleteDir(basedir); - final HTableDescriptor htd = createBasic3FamilyHTD(tableName); - HRegion region2 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); - HBaseTestingUtility.closeRegionAndWAL(region2); - WAL wal = createWAL(this.conf); - HRegion region = HRegion.openHRegion(hri, htd, wal, this.conf); - - // Add an edit so something in the WAL - byte [] row = tableName.getName(); - byte [] family = htd.getFamilies().iterator().next().getName(); - region.put((new Put(row)).addColumn(family, family, family)); - wal.sync(); - - List > hfs= new ArrayList>(1); - for (int i = 0; i < 3; i++) { - Path f = new Path(basedir, "hfile"+i); - HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(i + "00"), - Bytes.toBytes(i + "50"), 10); - hfs.add(Pair.newPair(family, f.toString())); - } - region.bulkLoadHFiles(hfs, true, null); - final int rowsInsertedCount = 31; - assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan()))); - - // major compact to turn all the bulk loaded files into one normal file - region.compact(true); - assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan()))); - - // Now 'crash' the region by stealing its wal - final Configuration newConf = HBaseConfiguration.create(this.conf); - User user = HBaseTestingUtility.getDifferentUser(newConf, - tableName.getNameAsString()); - user.runAs(new PrivilegedExceptionAction() { - @Override - public Object run() throws Exception { - runWALSplit(newConf); - WAL wal2 = createWAL(newConf); - - HRegion region2 = HRegion.openHRegion(newConf, FileSystem.get(newConf), - hbaseRootDir, hri, htd, wal2); - long seqid2 = region2.getOpenSeqNum(); - assertTrue(seqid2 > -1); - assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan()))); - - // I can't close wal1. Its been appropriated when we split. - region2.close(); - wal2.close(); - return null; - } - }); - } - - - /** - * Test writing edits into an HRegion, closing it, splitting logs, opening - * Region again. Verify seqids. - * @throws IOException - * @throws IllegalAccessException - * @throws NoSuchFieldException - * @throws IllegalArgumentException - * @throws SecurityException - */ - @Test - public void testReplayEditsWrittenViaHRegion() - throws IOException, SecurityException, IllegalArgumentException, - NoSuchFieldException, IllegalAccessException, InterruptedException { - final TableName tableName = - TableName.valueOf("testReplayEditsWrittenViaHRegion"); - final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); - final Path basedir = FSUtils.getTableDir(this.hbaseRootDir, tableName); - deleteDir(basedir); - final byte[] rowName = tableName.getName(); - final int countPerFamily = 10; - final HTableDescriptor htd = createBasic3FamilyHTD(tableName); - HRegion region3 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); - HBaseTestingUtility.closeRegionAndWAL(region3); - // Write countPerFamily edits into the three families. Do a flush on one - // of the families during the load of edits so its seqid is not same as - // others to test we do right thing when different seqids. - WAL wal = createWAL(this.conf); - HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal); - long seqid = region.getOpenSeqNum(); - boolean first = true; - for (HColumnDescriptor hcd: htd.getFamilies()) { - addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x"); - if (first) { - // If first, so we have at least one family w/ different seqid to rest. - region.flush(true); - first = false; - } - } - // Now assert edits made it in. - final Get g = new Get(rowName); - Result result = region.get(g); - assertEquals(countPerFamily * htd.getFamilies().size(), - result.size()); - // Now close the region (without flush), split the log, reopen the region and assert that - // replay of log has the correct effect, that our seqids are calculated correctly so - // all edits in logs are seen as 'stale'/old. - region.close(true); - wal.shutdown(); - runWALSplit(this.conf); - WAL wal2 = createWAL(this.conf); - HRegion region2 = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal2); - long seqid2 = region2.getOpenSeqNum(); - assertTrue(seqid + result.size() < seqid2); - final Result result1b = region2.get(g); - assertEquals(result.size(), result1b.size()); - - // Next test. Add more edits, then 'crash' this region by stealing its wal - // out from under it and assert that replay of the log adds the edits back - // correctly when region is opened again. - for (HColumnDescriptor hcd: htd.getFamilies()) { - addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region2, "y"); - } - // Get count of edits. - final Result result2 = region2.get(g); - assertEquals(2 * result.size(), result2.size()); - wal2.sync(); - final Configuration newConf = HBaseConfiguration.create(this.conf); - User user = HBaseTestingUtility.getDifferentUser(newConf, - tableName.getNameAsString()); - user.runAs(new PrivilegedExceptionAction() { - @Override - public Object run() throws Exception { - runWALSplit(newConf); - FileSystem newFS = FileSystem.get(newConf); - // Make a new wal for new region open. - WAL wal3 = createWAL(newConf); - final AtomicInteger countOfRestoredEdits = new AtomicInteger(0); - HRegion region3 = new HRegion(basedir, wal3, newFS, newConf, hri, htd, null) { - @Override - protected boolean restoreEdit(Store s, Cell cell) { - boolean b = super.restoreEdit(s, cell); - countOfRestoredEdits.incrementAndGet(); - return b; - } - }; - long seqid3 = region3.initialize(); - Result result3 = region3.get(g); - // Assert that count of cells is same as before crash. - assertEquals(result2.size(), result3.size()); - assertEquals(htd.getFamilies().size() * countPerFamily, - countOfRestoredEdits.get()); - - // I can't close wal1. Its been appropriated when we split. - region3.close(); - wal3.close(); - return null; - } - }); - } - - /** - * Test that we recover correctly when there is a failure in between the - * flushes. i.e. Some stores got flushed but others did not. - * - * Unfortunately, there is no easy hook to flush at a store level. The way - * we get around this is by flushing at the region level, and then deleting - * the recently flushed store file for one of the Stores. This would put us - * back in the situation where all but that store got flushed and the region - * died. - * - * We restart Region again, and verify that the edits were replayed. - * - * @throws IOException - * @throws IllegalAccessException - * @throws NoSuchFieldException - * @throws IllegalArgumentException - * @throws SecurityException - */ - @Test - public void testReplayEditsAfterPartialFlush() - throws IOException, SecurityException, IllegalArgumentException, - NoSuchFieldException, IllegalAccessException, InterruptedException { - final TableName tableName = - TableName.valueOf("testReplayEditsWrittenViaHRegion"); - final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); - final Path basedir = FSUtils.getTableDir(this.hbaseRootDir, tableName); - deleteDir(basedir); - final byte[] rowName = tableName.getName(); - final int countPerFamily = 10; - final HTableDescriptor htd = createBasic3FamilyHTD(tableName); - HRegion region3 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); - HBaseTestingUtility.closeRegionAndWAL(region3); - // Write countPerFamily edits into the three families. Do a flush on one - // of the families during the load of edits so its seqid is not same as - // others to test we do right thing when different seqids. - WAL wal = createWAL(this.conf); - HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal); - long seqid = region.getOpenSeqNum(); - for (HColumnDescriptor hcd: htd.getFamilies()) { - addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x"); - } - - // Now assert edits made it in. - final Get g = new Get(rowName); - Result result = region.get(g); - assertEquals(countPerFamily * htd.getFamilies().size(), - result.size()); - - // Let us flush the region - region.flush(true); - region.close(true); - wal.shutdown(); - - // delete the store files in the second column family to simulate a failure - // in between the flushcache(); - // we have 3 families. killing the middle one ensures that taking the maximum - // will make us fail. - int cf_count = 0; - for (HColumnDescriptor hcd: htd.getFamilies()) { - cf_count++; - if (cf_count == 2) { - region.getRegionFileSystem().deleteFamily(hcd.getNameAsString()); - } - } - - - // Let us try to split and recover - runWALSplit(this.conf); - WAL wal2 = createWAL(this.conf); - HRegion region2 = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal2); - long seqid2 = region2.getOpenSeqNum(); - assertTrue(seqid + result.size() < seqid2); - - final Result result1b = region2.get(g); - assertEquals(result.size(), result1b.size()); - } - - - // StoreFlusher implementation used in testReplayEditsAfterAbortingFlush. - // Only throws exception if throwExceptionWhenFlushing is set true. - public static class CustomStoreFlusher extends DefaultStoreFlusher { - // Switch between throw and not throw exception in flush - static final AtomicBoolean throwExceptionWhenFlushing = new AtomicBoolean(false); - - public CustomStoreFlusher(Configuration conf, Store store) { - super(conf, store); - } - @Override - public List flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, - MonitoredTask status, ThroughputController throughputController) throws IOException { - if (throwExceptionWhenFlushing.get()) { - throw new IOException("Simulated exception by tests"); - } - return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController); - } - - }; - - /** - * Test that we could recover the data correctly after aborting flush. In the - * test, first we abort flush after writing some data, then writing more data - * and flush again, at last verify the data. - * @throws IOException - */ - @Test - public void testReplayEditsAfterAbortingFlush() throws IOException { - final TableName tableName = - TableName.valueOf("testReplayEditsAfterAbortingFlush"); - final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); - final Path basedir = FSUtils.getTableDir(this.hbaseRootDir, tableName); - deleteDir(basedir); - final HTableDescriptor htd = createBasic3FamilyHTD(tableName); - HRegion region3 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); - HBaseTestingUtility.closeRegionAndWAL(region3); - // Write countPerFamily edits into the three families. Do a flush on one - // of the families during the load of edits so its seqid is not same as - // others to test we do right thing when different seqids. - WAL wal = createWAL(this.conf); - RegionServerServices rsServices = Mockito.mock(RegionServerServices.class); - Mockito.doReturn(false).when(rsServices).isAborted(); - when(rsServices.getServerName()).thenReturn(ServerName.valueOf("foo", 10, 10)); - Configuration customConf = new Configuration(this.conf); - customConf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY, - CustomStoreFlusher.class.getName()); - HRegion region = - HRegion.openHRegion(this.hbaseRootDir, hri, htd, wal, customConf, rsServices, null); - int writtenRowCount = 10; - List families = new ArrayList( - htd.getFamilies()); - for (int i = 0; i < writtenRowCount; i++) { - Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i))); - put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"), - Bytes.toBytes("val")); - region.put(put); - } - - // Now assert edits made it in. - RegionScanner scanner = region.getScanner(new Scan()); - assertEquals(writtenRowCount, getScannedCount(scanner)); - - // Let us flush the region - CustomStoreFlusher.throwExceptionWhenFlushing.set(true); - try { - region.flush(true); - fail("Injected exception hasn't been thrown"); - } catch (Throwable t) { - LOG.info("Expected simulated exception when flushing region," - + t.getMessage()); - // simulated to abort server - Mockito.doReturn(true).when(rsServices).isAborted(); - region.setClosing(false); // region normally does not accept writes after - // DroppedSnapshotException. We mock around it for this test. - } - // writing more data - int moreRow = 10; - for (int i = writtenRowCount; i < writtenRowCount + moreRow; i++) { - Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i))); - put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"), - Bytes.toBytes("val")); - region.put(put); - } - writtenRowCount += moreRow; - // call flush again - CustomStoreFlusher.throwExceptionWhenFlushing.set(false); - try { - region.flush(true); - } catch (IOException t) { - LOG.info("Expected exception when flushing region because server is stopped," - + t.getMessage()); - } - - region.close(true); - wal.shutdown(); - - // Let us try to split and recover - runWALSplit(this.conf); - WAL wal2 = createWAL(this.conf); - Mockito.doReturn(false).when(rsServices).isAborted(); - HRegion region2 = - HRegion.openHRegion(this.hbaseRootDir, hri, htd, wal2, this.conf, rsServices, null); - scanner = region2.getScanner(new Scan()); - assertEquals(writtenRowCount, getScannedCount(scanner)); - } - - private int getScannedCount(RegionScanner scanner) throws IOException { - int scannedCount = 0; - List results = new ArrayList(); - while (true) { - boolean existMore = scanner.next(results); - if (!results.isEmpty()) - scannedCount++; - if (!existMore) - break; - results.clear(); - } - return scannedCount; - } - - /** - * Create an HRegion with the result of a WAL split and test we only see the - * good edits - * @throws Exception - */ - @Test - public void testReplayEditsWrittenIntoWAL() throws Exception { - final TableName tableName = - TableName.valueOf("testReplayEditsWrittenIntoWAL"); - final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); - final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); - final Path basedir = FSUtils.getTableDir(hbaseRootDir, tableName); - deleteDir(basedir); - - final HTableDescriptor htd = createBasic3FamilyHTD(tableName); - HRegion region2 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); - HBaseTestingUtility.closeRegionAndWAL(region2); - final WAL wal = createWAL(this.conf); - final byte[] rowName = tableName.getName(); - final byte[] regionName = hri.getEncodedNameAsBytes(); - - // Add 1k to each family. - final int countPerFamily = 1000; - Set familyNames = new HashSet(); - NavigableMap scopes = new TreeMap( - Bytes.BYTES_COMPARATOR); - for(byte[] fam : htd.getFamiliesKeys()) { - scopes.put(fam, 0); - } - for (HColumnDescriptor hcd: htd.getFamilies()) { - addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, - ee, wal, htd, mvcc, scopes); - familyNames.add(hcd.getName()); - } - - // Add a cache flush, shouldn't have any effect - wal.startCacheFlush(regionName, familyNames); - wal.completeCacheFlush(regionName); - - // Add an edit to another family, should be skipped. - WALEdit edit = new WALEdit(); - long now = ee.currentTime(); - edit.add(new KeyValue(rowName, Bytes.toBytes("another family"), rowName, - now, rowName)); - wal.append(hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), edit, - true); - - // Delete the c family to verify deletes make it over. - edit = new WALEdit(); - now = ee.currentTime(); - edit.add(new KeyValue(rowName, Bytes.toBytes("c"), null, now, KeyValue.Type.DeleteFamily)); - wal.append(hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), edit, - true); - - // Sync. - wal.sync(); - // Make a new conf and a new fs for the splitter to run on so we can take - // over old wal. - final Configuration newConf = HBaseConfiguration.create(this.conf); - User user = HBaseTestingUtility.getDifferentUser(newConf, - ".replay.wal.secondtime"); - user.runAs(new PrivilegedExceptionAction() { - @Override - public Void run() throws Exception { - runWALSplit(newConf); - FileSystem newFS = FileSystem.get(newConf); - // 100k seems to make for about 4 flushes during HRegion#initialize. - newConf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 100); - // Make a new wal for new region. - WAL newWal = createWAL(newConf); - final AtomicInteger flushcount = new AtomicInteger(0); - try { - final HRegion region = - new HRegion(basedir, newWal, newFS, newConf, hri, htd, null) { - @Override - protected FlushResult internalFlushcache(final WAL wal, final long myseqid, - final Collection storesToFlush, MonitoredTask status, - boolean writeFlushWalMarker) - throws IOException { - LOG.info("InternalFlushCache Invoked"); - FlushResult fs = super.internalFlushcache(wal, myseqid, storesToFlush, - Mockito.mock(MonitoredTask.class), writeFlushWalMarker); - flushcount.incrementAndGet(); - return fs; - } - }; - // The seq id this region has opened up with - long seqid = region.initialize(); - - // The mvcc readpoint of from inserting data. - long writePoint = mvcc.getWritePoint(); - - // We flushed during init. - assertTrue("Flushcount=" + flushcount.get(), flushcount.get() > 0); - assertTrue((seqid - 1) == writePoint); - - Get get = new Get(rowName); - Result result = region.get(get); - // Make sure we only see the good edits - assertEquals(countPerFamily * (htd.getFamilies().size() - 1), - result.size()); - region.close(); - } finally { - newWal.close(); - } - return null; - } - }); - } - - @Test - // the following test is for HBASE-6065 - public void testSequentialEditLogSeqNum() throws IOException { - final TableName tableName = TableName.valueOf(currentTest.getMethodName()); - final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); - final Path basedir = - FSUtils.getTableDir(this.hbaseRootDir, tableName); - deleteDir(basedir); - final byte[] rowName = tableName.getName(); - final int countPerFamily = 10; - final HTableDescriptor htd = createBasic1FamilyHTD(tableName); - - // Mock the WAL - MockWAL wal = createMockWAL(); - - HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal); - for (HColumnDescriptor hcd : htd.getFamilies()) { - addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x"); - } - - // Let us flush the region - // But this time completeflushcache is not yet done - region.flush(true); - for (HColumnDescriptor hcd : htd.getFamilies()) { - addRegionEdits(rowName, hcd.getName(), 5, this.ee, region, "x"); - } - long lastestSeqNumber = region.getReadPoint(null); - // get the current seq no - wal.doCompleteCacheFlush = true; - // allow complete cache flush with the previous seq number got after first - // set of edits. - wal.completeCacheFlush(hri.getEncodedNameAsBytes()); - wal.shutdown(); - FileStatus[] listStatus = wal.getFiles(); - assertNotNull(listStatus); - assertTrue(listStatus.length > 0); - WALSplitter.splitLogFile(hbaseRootDir, listStatus[0], - this.fs, this.conf, null, null, null, mode, wals); - FileStatus[] listStatus1 = this.fs.listStatus( - new Path(FSUtils.getTableDir(hbaseRootDir, tableName), new Path(hri.getEncodedName(), - "recovered.edits")), new PathFilter() { - @Override - public boolean accept(Path p) { - if (WALSplitter.isSequenceIdFile(p)) { - return false; - } - return true; - } - }); - int editCount = 0; - for (FileStatus fileStatus : listStatus1) { - editCount = Integer.parseInt(fileStatus.getPath().getName()); - } - // The sequence number should be same - assertEquals( - "The sequence number of the recoverd.edits and the current edit seq should be same", - lastestSeqNumber, editCount); - } - - /** - * testcase for https://issues.apache.org/jira/browse/HBASE-15252 - */ - @Test - public void testDatalossWhenInputError() throws IOException, InstantiationException, - IllegalAccessException { - final TableName tableName = TableName.valueOf("testDatalossWhenInputError"); - final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); - final Path basedir = FSUtils.getTableDir(this.hbaseRootDir, tableName); - deleteDir(basedir); - final byte[] rowName = tableName.getName(); - final int countPerFamily = 10; - final HTableDescriptor htd = createBasic1FamilyHTD(tableName); - HRegion region1 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); - Path regionDir = region1.getRegionFileSystem().getRegionDir(); - HBaseTestingUtility.closeRegionAndWAL(region1); - - WAL wal = createWAL(this.conf); - HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal); - for (HColumnDescriptor hcd : htd.getFamilies()) { - addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x"); - } - // Now assert edits made it in. - final Get g = new Get(rowName); - Result result = region.get(g); - assertEquals(countPerFamily * htd.getFamilies().size(), result.size()); - // Now close the region (without flush), split the log, reopen the region and assert that - // replay of log has the correct effect. - region.close(true); - wal.shutdown(); - - runWALSplit(this.conf); - - // here we let the DFSInputStream throw an IOException just after the WALHeader. - Path editFile = WALSplitter.getSplitEditFilesSorted(this.fs, regionDir).first(); - FSDataInputStream stream = fs.open(editFile); - stream.seek(ProtobufLogReader.PB_WAL_MAGIC.length); - Class logReaderClass = - conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class, - DefaultWALProvider.Reader.class); - DefaultWALProvider.Reader reader = logReaderClass.newInstance(); - reader.init(this.fs, editFile, conf, stream); - final long headerLength = stream.getPos(); - reader.close(); - FileSystem spyFs = spy(this.fs); - doAnswer(new Answer() { - - @Override - public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable { - FSDataInputStream stream = (FSDataInputStream) invocation.callRealMethod(); - Field field = FilterInputStream.class.getDeclaredField("in"); - field.setAccessible(true); - final DFSInputStream in = (DFSInputStream) field.get(stream); - DFSInputStream spyIn = spy(in); - doAnswer(new Answer() { - - private long pos; - - @Override - public Integer answer(InvocationOnMock invocation) throws Throwable { - if (pos >= headerLength) { - throw new IOException("read over limit"); - } - int b = (Integer) invocation.callRealMethod(); - if (b > 0) { - pos += b; - } - return b; - } - }).when(spyIn).read(any(byte[].class), any(int.class), any(int.class)); - doAnswer(new Answer() { - - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - invocation.callRealMethod(); - in.close(); - return null; - } - }).when(spyIn).close(); - field.set(stream, spyIn); - return stream; - } - }).when(spyFs).open(eq(editFile)); - - WAL wal2 = createWAL(this.conf); - HRegion region2; - try { - // log replay should fail due to the IOException, otherwise we may lose data. - region2 = HRegion.openHRegion(conf, spyFs, hbaseRootDir, hri, htd, wal2); - assertEquals(result.size(), region2.get(g).size()); - } catch (IOException e) { - assertEquals("read over limit", e.getMessage()); - } - region2 = HRegion.openHRegion(conf, fs, hbaseRootDir, hri, htd, wal2); - assertEquals(result.size(), region2.get(g).size()); - } - - /** - * testcase for https://issues.apache.org/jira/browse/HBASE-14949. - */ - private void testNameConflictWhenSplit(boolean largeFirst) throws IOException { - final TableName tableName = TableName.valueOf("testReplayEditsWrittenIntoWAL"); - final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); - final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); - final Path basedir = FSUtils.getTableDir(hbaseRootDir, tableName); - deleteDir(basedir); - - final HTableDescriptor htd = createBasic1FamilyHTD(tableName); - NavigableMap scopes = new TreeMap(Bytes.BYTES_COMPARATOR); - for (byte[] fam : htd.getFamiliesKeys()) { - scopes.put(fam, 0); - } - HRegion region = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); - HBaseTestingUtility.closeRegionAndWAL(region); - final byte[] family = htd.getColumnFamilies()[0].getName(); - final byte[] rowName = tableName.getName(); - FSWALEntry entry1 = createFSWALEntry(htd, hri, 1L, rowName, family, ee, mvcc, 1, scopes); - FSWALEntry entry2 = createFSWALEntry(htd, hri, 2L, rowName, family, ee, mvcc, 2, scopes); - - Path largeFile = new Path(logDir, "wal-1"); - Path smallFile = new Path(logDir, "wal-2"); - writerWALFile(largeFile, Arrays.asList(entry1, entry2)); - writerWALFile(smallFile, Arrays.asList(entry2)); - FileStatus first, second; - if (largeFirst) { - first = fs.getFileStatus(largeFile); - second = fs.getFileStatus(smallFile); - } else { - first = fs.getFileStatus(smallFile); - second = fs.getFileStatus(largeFile); - } - WALSplitter.splitLogFile(hbaseRootDir, first, fs, conf, null, null, null, - RecoveryMode.LOG_SPLITTING, wals); - WALSplitter.splitLogFile(hbaseRootDir, second, fs, conf, null, null, null, - RecoveryMode.LOG_SPLITTING, wals); - WAL wal = createWAL(this.conf); - region = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal); - assertTrue(region.getOpenSeqNum() > mvcc.getWritePoint()); - assertEquals(2, region.get(new Get(rowName)).size()); - } - - @Test - public void testNameConflictWhenSplit0() throws IOException { - testNameConflictWhenSplit(true); - } - - @Test - public void testNameConflictWhenSplit1() throws IOException { - testNameConflictWhenSplit(false); - } - - static class MockWAL extends FSHLog { - boolean doCompleteCacheFlush = false; - - public MockWAL(FileSystem fs, Path rootDir, String logName, Configuration conf) - throws IOException { - super(fs, rootDir, logName, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null); - } - - @Override - public void completeCacheFlush(byte[] encodedRegionName) { - if (!doCompleteCacheFlush) { - return; - } - super.completeCacheFlush(encodedRegionName); - } - } - - private HTableDescriptor createBasic1FamilyHTD(final TableName tableName) { - HTableDescriptor htd = new HTableDescriptor(tableName); - HColumnDescriptor a = new HColumnDescriptor(Bytes.toBytes("a")); - htd.addFamily(a); - return htd; - } - - private MockWAL createMockWAL() throws IOException { - MockWAL wal = new MockWAL(fs, hbaseRootDir, logName, conf); - // Set down maximum recovery so we dfsclient doesn't linger retrying something - // long gone. - HBaseTestingUtility.setMaxRecoveryErrorCount(wal.getOutputStream(), 1); - return wal; - } - - // Flusher used in this test. Keep count of how often we are called and - // actually run the flush inside here. - class TestFlusher implements FlushRequester { - private HRegion r; - - @Override - public void requestFlush(Region region, boolean force) { - try { - r.flush(force); - } catch (IOException e) { - throw new RuntimeException("Exception flushing", e); - } - } - - @Override - public void requestDelayedFlush(Region region, long when, boolean forceFlushAllStores) { - // TODO Auto-generated method stub - - } - - @Override - public void registerFlushRequestListener(FlushRequestListener listener) { - - } - - @Override - public boolean unregisterFlushRequestListener(FlushRequestListener listener) { - return false; - } - - @Override - public void setGlobalMemstoreLimit(long globalMemStoreSize) { - - } - } - - private WALKey createWALKey(final TableName tableName, final HRegionInfo hri, - final MultiVersionConcurrencyControl mvcc, NavigableMap scopes) { - return new WALKey(hri.getEncodedNameAsBytes(), tableName, 999, mvcc, scopes); - } - - private WALEdit createWALEdit(final byte[] rowName, final byte[] family, EnvironmentEdge ee, - int index) { - byte[] qualifierBytes = Bytes.toBytes(Integer.toString(index)); - byte[] columnBytes = Bytes.toBytes(Bytes.toString(family) + ":" + Integer.toString(index)); - WALEdit edit = new WALEdit(); - edit.add(new KeyValue(rowName, family, qualifierBytes, ee.currentTime(), columnBytes)); - return edit; - } - - private FSWALEntry createFSWALEntry(HTableDescriptor htd, HRegionInfo hri, long sequence, - byte[] rowName, byte[] family, EnvironmentEdge ee, MultiVersionConcurrencyControl mvcc, - int index, NavigableMap scopes) throws IOException { - FSWALEntry entry = - new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc, scopes), createWALEdit( - rowName, family, ee, index), hri, true); - entry.stampRegionSequenceId(); - return entry; - } - - private void addWALEdits(final TableName tableName, final HRegionInfo hri, final byte[] rowName, - final byte[] family, final int count, EnvironmentEdge ee, final WAL wal, - final HTableDescriptor htd, final MultiVersionConcurrencyControl mvcc, - NavigableMap scopes) throws IOException { - for (int j = 0; j < count; j++) { - wal.append(hri, createWALKey(tableName, hri, mvcc, scopes), - createWALEdit(rowName, family, ee, j), true); - } - wal.sync(); - } - - static List addRegionEdits(final byte[] rowName, final byte[] family, final int count, - EnvironmentEdge ee, final Region r, final String qualifierPrefix) throws IOException { - List puts = new ArrayList(); - for (int j = 0; j < count; j++) { - byte[] qualifier = Bytes.toBytes(qualifierPrefix + Integer.toString(j)); - Put p = new Put(rowName); - p.addColumn(family, qualifier, ee.currentTime(), rowName); - r.put(p); - puts.add(p); - } - return puts; - } - - /* - * Creates an HRI around an HTD that has tableName and three - * column families named 'a','b', and 'c'. - * @param tableName Name of table to use when we create HTableDescriptor. - */ - private HRegionInfo createBasic3FamilyHRegionInfo(final TableName tableName) { - return new HRegionInfo(tableName, null, null, false); - } - - /* - * Run the split. Verify only single split file made. - * @param c - * @return The single split file made - * @throws IOException - */ - private Path runWALSplit(final Configuration c) throws IOException { - List splits = WALSplitter.split( - hbaseRootDir, logDir, oldLogDir, FileSystem.get(c), c, wals); - // Split should generate only 1 file since there's only 1 region - assertEquals("splits=" + splits, 1, splits.size()); - // Make sure the file exists - assertTrue(fs.exists(splits.get(0))); - LOG.info("Split file=" + splits.get(0)); - return splits.get(0); - } - - /* - * @param c - * @return WAL with retries set down from 5 to 1 only. - * @throws IOException - */ - private WAL createWAL(final Configuration c) throws IOException { - FSHLog wal = new FSHLog(FileSystem.get(c), hbaseRootDir, logName, c); - // Set down maximum recovery so we dfsclient doesn't linger retrying something - // long gone. - HBaseTestingUtility.setMaxRecoveryErrorCount(wal.getOutputStream(), 1); - return wal; - } - - private HTableDescriptor createBasic3FamilyHTD(final TableName tableName) { - HTableDescriptor htd = new HTableDescriptor(tableName); - HColumnDescriptor a = new HColumnDescriptor(Bytes.toBytes("a")); - htd.addFamily(a); - HColumnDescriptor b = new HColumnDescriptor(Bytes.toBytes("b")); - htd.addFamily(b); - HColumnDescriptor c = new HColumnDescriptor(Bytes.toBytes("c")); - htd.addFamily(c); - return htd; - } - - private void writerWALFile(Path file, List entries) throws IOException { - fs.mkdirs(file.getParent()); - ProtobufLogWriter writer = new ProtobufLogWriter(); - writer.init(fs, file, conf, true); - for (FSWALEntry entry : entries) { - writer.append(entry); - } - writer.sync(); - writer.close(); + Configuration conf = AbstractTestWALReplay.TEST_UTIL.getConfiguration(); + conf.set(WALFactory.WAL_PROVIDER, "filesystem"); + AbstractTestWALReplay.setUpBeforeClass(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayCompressed.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayCompressed.java index b225554..e6856f2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayCompressed.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayCompressed.java @@ -21,19 +21,21 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.wal.WALFactory; import org.junit.BeforeClass; import org.junit.experimental.categories.Category; /** * Enables compression and runs the TestWALReplay tests. */ -@Category({RegionServerTests.class, MediumTests.class}) -public class TestWALReplayCompressed extends TestWALReplay { +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestWALReplayCompressed extends AbstractTestWALReplay { @BeforeClass public static void setUpBeforeClass() throws Exception { - TestWALReplay.setUpBeforeClass(); - Configuration conf = TestWALReplay.TEST_UTIL.getConfiguration(); + Configuration conf = AbstractTestWALReplay.TEST_UTIL.getConfiguration(); + conf.set(WALFactory.WAL_PROVIDER, "filesystem"); conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true); + AbstractTestWALReplay.setUpBeforeClass(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java index 0eef3b1..8427c95 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java @@ -138,6 +138,8 @@ public class TestWALFactory { "dfs.client.block.recovery.retries", 1); TEST_UTIL.getConfiguration().setInt( "hbase.ipc.client.connection.maxidletime", 500); + TEST_UTIL.getConfiguration().setInt("hbase.lease.recovery.timeout", 10000); + TEST_UTIL.getConfiguration().setInt("hbase.lease.recovery.dfs.timeout", 1000); TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY, SampleRegionWALObserver.class.getName()); TEST_UTIL.startMiniDFSCluster(3); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java index beac9e2..4837f50 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; // imports for things that haven't moved from regionserver.wal yet. import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter; +import org.apache.hadoop.hbase.regionserver.wal.SecureAsyncProtobufLogWriter; import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogReader; import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogWriter; import org.apache.hadoop.hbase.regionserver.wal.SecureWALCellCodec; @@ -129,7 +130,7 @@ public class TestWALReaderOnSecureWAL { conf.set(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, clsName); } } - + @Test() public void testWALReaderOnSecureWAL() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); @@ -137,6 +138,8 @@ public class TestWALReaderOnSecureWAL { WAL.Reader.class); conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class, WALProvider.Writer.class); + conf.setClass("hbase.regionserver.hlog.async.writer.impl", SecureAsyncProtobufLogWriter.class, + WALProvider.AsyncWriter.class); conf.setBoolean(WAL_ENCRYPTION, true); FileSystem fs = TEST_UTIL.getTestFileSystem(); final WALFactory wals = new WALFactory(conf, null, currentTest.getMethodName()); @@ -157,7 +160,7 @@ public class TestWALReaderOnSecureWAL { } catch (IOException ioe) { // expected IOE } - + FileStatus[] listStatus = fs.listStatus(walPath.getParent()); RecoveryMode mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING); @@ -174,7 +177,7 @@ public class TestWALReaderOnSecureWAL { } wals.close(); } - + @Test() public void testSecureWALReaderOnWAL() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); @@ -202,7 +205,7 @@ public class TestWALReaderOnSecureWAL { } catch (IOException ioe) { assertFalse(true); } - + FileStatus[] listStatus = fs.listStatus(walPath.getParent()); RecoveryMode mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING); -- 1.9.1