From 1b07bd213b22c1e77fae45f74dc9b49b173d2658 Mon Sep 17 00:00:00 2001 From: chenheng Date: Tue, 26 Jan 2016 13:58:43 +0800 Subject: [PATCH] HBASE-15169 Backport HBASE-14362 'TestWALProcedureStoreOnHDFS is super duper flaky' to branch-1.1 --- .../procedure/TestWALProcedureStoreOnHDFS.java | 189 ++++++++++----------- 1 file changed, 92 insertions(+), 97 deletions(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestWALProcedureStoreOnHDFS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestWALProcedureStoreOnHDFS.java index e26418d..a5a3620 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestWALProcedureStoreOnHDFS.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestWALProcedureStoreOnHDFS.java @@ -18,45 +18,24 @@ package org.apache.hadoop.hbase.master.procedure; -import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.CountDownLatch; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.MiniHBaseCluster; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.master.HMaster; -import org.apache.hadoop.hbase.procedure2.Procedure; -import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; -import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.CreateTableState; -import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.DeleteTableState; -import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.DisableTableState; -import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.EnableTableState; -import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.TruncateTableState; import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.util.ModifyRegionUtils; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.server.datanode.DataNode; -import org.junit.After; -import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.mockito.Mockito; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -71,7 +50,18 @@ public class TestWALProcedureStoreOnHDFS { private WALProcedureStore store; - private static void setupConf(Configuration conf) { + private ProcedureStore.ProcedureStoreListener stopProcedureListener = new ProcedureStore.ProcedureStoreListener() { + @Override + public void postSync() {} + + @Override + public void abortProcess() { + LOG.fatal("Abort the Procedure Store"); + store.stop(true); + } + }; + + private static void initConfig(Configuration conf) { conf.setInt("dfs.replication", 3); conf.setInt("dfs.namenode.replication.min", 3); @@ -81,29 +71,16 @@ public class TestWALProcedureStoreOnHDFS { conf.setInt("hbase.procedure.store.wal.sync.failure.roll.max", 10); } - @Before public void setup() throws Exception { - setupConf(UTIL.getConfiguration()); MiniDFSCluster dfs = UTIL.startMiniDFSCluster(3); - Path logDir = new Path(new Path(dfs.getFileSystem().getUri()), "/test-logs"); - store = ProcedureTestingUtility.createWalStore( - UTIL.getConfiguration(), dfs.getFileSystem(), logDir); - store.registerListener(new ProcedureStore.ProcedureStoreListener() { - @Override - public void postSync() {} - - @Override - public void abortProcess() { - LOG.fatal("Abort the Procedure Store"); - store.stop(true); - } - }); + store = ProcedureTestingUtility.createWalStore(UTIL.getConfiguration(), dfs.getFileSystem(), + logDir); + store.registerListener(stopProcedureListener); store.start(8); store.recoverLease(); } - @After public void tearDown() throws Exception { store.stop(false); UTIL.getDFSCluster().getFileSystem().delete(store.getLogDir(), true); @@ -117,84 +94,102 @@ public class TestWALProcedureStoreOnHDFS { @Test(timeout=60000, expected=RuntimeException.class) public void testWalAbortOnLowReplication() throws Exception { - assertEquals(3, UTIL.getDFSCluster().getDataNodes().size()); - - LOG.info("Stop DataNode"); - UTIL.getDFSCluster().stopDataNode(0); - assertEquals(2, UTIL.getDFSCluster().getDataNodes().size()); + initConfig(UTIL.getConfiguration()); + setup(); + try { + assertEquals(3, UTIL.getDFSCluster().getDataNodes().size()); - store.insert(new TestProcedure(1, -1), null); - for (long i = 2; store.isRunning(); ++i) { + LOG.info("Stop DataNode"); + UTIL.getDFSCluster().stopDataNode(0); assertEquals(2, UTIL.getDFSCluster().getDataNodes().size()); - store.insert(new TestProcedure(i, -1), null); - Thread.sleep(100); + + store.insert(new TestProcedure(1, -1), null); + for (long i = 2; store.isRunning(); ++i) { + assertEquals(2, UTIL.getDFSCluster().getDataNodes().size()); + store.insert(new TestProcedure(i, -1), null); + Thread.sleep(100); + } + assertFalse(store.isRunning()); + fail("The store.insert() should throw an exeption"); + } finally { + tearDown(); } - assertFalse(store.isRunning()); - fail("The store.insert() should throw an exeption"); } @Test(timeout=60000) public void testWalAbortOnLowReplicationWithQueuedWriters() throws Exception { - assertEquals(3, UTIL.getDFSCluster().getDataNodes().size()); + initConfig(UTIL.getConfiguration()); + setup(); + try { + assertEquals(3, UTIL.getDFSCluster().getDataNodes().size()); + store.registerListener(new ProcedureStore.ProcedureStoreListener() { + @Override + public void postSync() { + Threads.sleepWithoutInterrupt(2000); + } - store.registerListener(new ProcedureStore.ProcedureStoreListener() { - @Override - public void postSync() { - Threads.sleepWithoutInterrupt(2000); + @Override + public void abortProcess() {} + }); + + final AtomicInteger reCount = new AtomicInteger(0); + Thread[] thread = new Thread[store.getNumThreads() * 2 + 1]; + for (int i = 0; i < thread.length; ++i) { + final long procId = i + 1; + thread[i] = new Thread() { + public void run() { + try { + LOG.debug("[S] INSERT " + procId); + store.insert(new TestProcedure(procId, -1), null); + LOG.debug("[E] INSERT " + procId); + } catch (RuntimeException e) { + reCount.incrementAndGet(); + LOG.debug("[F] INSERT " + procId + ": " + e.getMessage()); + } + } + }; + thread[i].start(); } - @Override - public void abortProcess() {} - }); - - final AtomicInteger reCount = new AtomicInteger(0); - Thread[] thread = new Thread[store.getNumThreads() * 2 + 1]; - for (int i = 0; i < thread.length; ++i) { - final long procId = i + 1; - thread[i] = new Thread() { - public void run() { - try { - LOG.debug("[S] INSERT " + procId); - store.insert(new TestProcedure(procId, -1), null); - LOG.debug("[E] INSERT " + procId); - } catch (RuntimeException e) { - reCount.incrementAndGet(); - LOG.debug("[F] INSERT " + procId + ": " + e.getMessage()); - } - } - }; - thread[i].start(); - } + Thread.sleep(1000); + LOG.info("Stop DataNode"); + UTIL.getDFSCluster().stopDataNode(0); + assertEquals(2, UTIL.getDFSCluster().getDataNodes().size()); - Thread.sleep(1000); - LOG.info("Stop DataNode"); - UTIL.getDFSCluster().stopDataNode(0); - assertEquals(2, UTIL.getDFSCluster().getDataNodes().size()); + for (int i = 0; i < thread.length; ++i) { + thread[i].join(); + } - for (int i = 0; i < thread.length; ++i) { - thread[i].join(); + assertFalse(store.isRunning()); + assertTrue(reCount.toString(), reCount.get() >= store.getNumThreads() && + reCount.get() < thread.length); + } finally { + tearDown(); } - - assertFalse(store.isRunning()); - assertTrue(reCount.toString(), reCount.get() >= store.getNumThreads() && - reCount.get() < thread.length); } @Test(timeout=60000) public void testWalRollOnLowReplication() throws Exception { - int dnCount = 0; - store.insert(new TestProcedure(1, -1), null); - UTIL.getDFSCluster().restartDataNode(dnCount); - for (long i = 2; i < 100; ++i) { - store.insert(new TestProcedure(i, -1), null); - waitForNumReplicas(3); - Thread.sleep(100); - if ((i % 30) == 0) { - LOG.info("Restart Data Node"); - UTIL.getDFSCluster().restartDataNode(++dnCount % 3); + initConfig(UTIL.getConfiguration()); + UTIL.getConfiguration().setInt("dfs.namenode.replication.min", 1); + setup(); + try { + int dnCount = 0; + store.insert(new TestProcedure(1, -1), null); + UTIL.getDFSCluster().restartDataNode(dnCount); + for (long i = 2; i < 100; ++i) { + store.insert(new TestProcedure(i, -1), null); + waitForNumReplicas(3); + Thread.sleep(100); + if ((i % 30) == 0) { + LOG.info("Restart Data Node"); + UTIL.getDFSCluster().restartDataNode(++dnCount % 3); + } } + assertTrue(store.isRunning()); + } finally { + tearDown(); } - assertTrue(store.isRunning()); } public void waitForNumReplicas(int numReplicas) throws Exception { -- 1.9.3 (Apple Git-50)