From 43bb073ee760cd67bf5b56b1c68241a866468a16 Mon Sep 17 00:00:00 2001 From: manukranthk Date: Tue, 23 Sep 2014 19:15:09 -0700 Subject: [PATCH] Add a test case for Preemptive Fast Fail --- .../hadoop/hbase/client/TestFromClientSide3.java | 139 +++++++++++++++++++++ 1 file changed, 139 insertions(+) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java index 7b15544..361e540 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java @@ -24,13 +24,22 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.ClusterStatus; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; @@ -432,4 +441,134 @@ public class TestFromClientSide3 { assertTrue(Arrays.equals(res.getValue(FAMILY, COL_QUAL), VAL_BYTES)); table.close(); } + + @Test + public void testClientRelearning() throws IOException, InterruptedException { + Admin admin = TEST_UTIL.getHBaseAdmin(); + + final String tableName = "testClientRelearningExperiment"; + HTableDescriptor desc = new HTableDescriptor( + TableName.valueOf(Bytes.toBytes(tableName))); + desc.addFamily(new HColumnDescriptor(FAMILY)); + admin.createTable(desc); + + int nThreads = 200; + java.util.concurrent.ExecutorService service = + Executors.newFixedThreadPool(nThreads); + final CountDownLatch continueOtherHalf = new CountDownLatch(1); + final CountDownLatch doneHalfway = new CountDownLatch(nThreads); + final int SLEEPTIME = 1000; + TEST_UTIL.getConfiguration().setLong( + HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, SLEEPTIME); + final AtomicInteger numSuccessfullThreads = new AtomicInteger(0); + final AtomicInteger numFailedThreads = new AtomicInteger(0); + + // The total time taken for the threads to perform the second put; + final AtomicLong totalTimeTaken = new AtomicLong(0); + final AtomicInteger numBlockedWorkers = new AtomicInteger(0); + + List> futures = new ArrayList>(); + for (int i = 0; i < nThreads; i++) { + final int curThread = i; + futures.add(service.submit(new Callable() { + private Random r = new Random(); + + @SuppressWarnings({ "deprecation" }) + @Override + public Boolean call() throws Exception { + try (Table table = + new HTable(TEST_UTIL.getConfiguration(), tableName)) { + Thread.sleep(10 * curThread); // To add a cascading effect. + table.setAutoFlushTo(true); + + byte[] row = new byte[20]; + // First Put + r.nextBytes(row); + Put put = new Put(row); + put.add(FAMILY, null, row); + try { + table.put(put); + } catch (Exception e) { + LOG.debug("Put failed : ", e); + return false; + } + + doneHalfway.countDown(); + continueOtherHalf.await(); + + long startTime = System.currentTimeMillis(); + // Second Put + r.nextBytes(row); + put = new Put(row); + put.add(FAMILY, null, row); + try { + table.put(put); + } catch (Exception e) { + LOG.debug("Put failed : ", e); + numFailedThreads.addAndGet(1); + return false; + } + numSuccessfullThreads.addAndGet(1); + long enTime = System.currentTimeMillis(); + totalTimeTaken.addAndGet(enTime - startTime); + if (enTime - startTime >= SLEEPTIME) { + numBlockedWorkers.addAndGet(1); + } + return true; + } + } + })); + } + + doneHalfway.await(); + ClusterStatus status = TEST_UTIL.getHBaseCluster().getClusterStatus(); + + // Kill a regionserver or 2 + for (int i = 0; i < SLAVES; i++) { + TEST_UTIL.getHBaseCluster().abortRegionServer(0); + } + + // Let the threads continue going + continueOtherHalf.countDown(); + + Thread.sleep(2 * SLEEPTIME); + // Restore the cluster + TEST_UTIL.getHBaseCluster().restoreClusterStatus(status); + + int numThreadsReturnedFalse = 0; + int numThreadsReturnedTrue = 0; + int numThreadsThrewExceptions = 0; + for (Future f : futures) { + try { + numThreadsReturnedTrue += f.get() ? 1 : 0; + numThreadsReturnedFalse += f.get() ? 0 : 1; + } catch (Exception e) { + numThreadsThrewExceptions++; + } + } + LOG.debug("numThreadsReturnedFalse:" + numThreadsReturnedFalse + + " numThreadsReturnedTrue:" + numThreadsReturnedTrue + + " numThreadsThrewExceptions:" + numThreadsThrewExceptions + + " numFailedThreads:" + numFailedThreads.get() + + " numSuccessfullThreads:" + numSuccessfullThreads.get() + + " numBlockedWorkers:" + numBlockedWorkers.get() + + " totalTimeWaited: " + totalTimeTaken.get() / numBlockedWorkers.get()); + + assertEquals("The expected number of all the successfull and the failed " + + "threads should equal the total number of threads that we spawned", + nThreads, numFailedThreads.get() + numSuccessfullThreads.get()); + assertEquals("All the failures should be coming from the secondput failure", + numFailedThreads.get(), numThreadsReturnedFalse); + assertEquals("Number of threads that threw execution exceptions " + + "otherwise should be 0", numThreadsThrewExceptions, 0); + assertTrue("Only one thread should ideally be waiting for the dead " + + "regionservers to be coming back. numBlockedWorkers:" + + numBlockedWorkers.get(), numBlockedWorkers.get() <= 1); + assertEquals("The regionservers that returned true should equal to the" + + " number of successful threads", numThreadsReturnedTrue, + numSuccessfullThreads.get()); + + } } + + \ No newline at end of file -- 1.9.4