From 23368e3958f69c11e6e183094959240fe2a9bb6e Mon Sep 17 00:00:00 2001 From: Elliott Clark Date: Fri, 5 Jul 2013 15:04:40 -0700 Subject: [PATCH] add test for mttr --- ...grationTestRebalanceAndKillServersTargeted.java | 2 +- .../hadoop/hbase/mttr/IntegrationTestMTTR.java | 307 ++++++++++++++++++++ .../org/apache/hadoop/hbase/util/ChaosMonkey.java | 51 +++- 3 files changed, 347 insertions(+), 13 deletions(-) create mode 100644 hbase-it/src/test/java/org/apache/hadoop/hbase/mttr/IntegrationTestMTTR.java diff --git hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRebalanceAndKillServersTargeted.java hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRebalanceAndKillServersTargeted.java index 1f4c279..e5fdd5b 100644 --- hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRebalanceAndKillServersTargeted.java +++ hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRebalanceAndKillServersTargeted.java @@ -74,7 +74,7 @@ public class IntegrationTestRebalanceAndKillServersTargeted extends IngestIntegr private static final long WAIT_AFTER_BALANCE_MS = 5 * 1000; @Override - protected void perform() throws Exception { + public void perform() throws Exception { ClusterStatus status = this.cluster.getClusterStatus(); List victimServers = new LinkedList(status.getServers()); int liveCount = (int)Math.ceil(FRC_SERVERS_THAT_HOARD_AND_LIVE * victimServers.size()); diff --git hbase-it/src/test/java/org/apache/hadoop/hbase/mttr/IntegrationTestMTTR.java hbase-it/src/test/java/org/apache/hadoop/hbase/mttr/IntegrationTestMTTR.java new file mode 100644 index 0000000..f2f5ba7 --- /dev/null +++ hbase-it/src/test/java/org/apache/hadoop/hbase/mttr/IntegrationTestMTTR.java @@ -0,0 +1,307 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mttr; + +import com.google.common.base.Objects; +import org.apache.commons.lang.RandomStringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseCluster; +import org.apache.hadoop.hbase.IntegrationTestingUtility; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ChaosMonkey; +import org.apache.hadoop.hbase.util.LoadTestTool; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import static junit.framework.Assert.assertEquals; + +public class IntegrationTestMTTR { + + private static final byte[] FAMILY = Bytes.toBytes("d"); + private static final Log LOG = LogFactory.getLog(IntegrationTestMTTR.class); + + private String tableName; + private byte[] tableNameBytes; + + private String loadTableName; + private IntegrationTestingUtility util; + private HBaseCluster cluster; + private ExecutorService executorService; + private ChaosMonkey.Action restartRSAction; + private LoadTestTool loadTool; + + @Test + public void testMTTR() throws Exception { + int maxIters = cluster.isDistributedCluster() ? 15 : 5; + + // Array to keep track of times. + ArrayList timesPutMs = new ArrayList(maxIters); + ArrayList timesScanMs = new ArrayList(maxIters); + long start = System.nanoTime(); + + // We're going to try this multiple times + for (int fullIterations = 0; fullIterations < maxIters; fullIterations++) { + // Create and start executing a callable that will kill the servers + Future restartFuture = executorService.submit(new RestartRSCallable()); + + // Pass that future to the scan and put callables. + Future putFuture = executorService.submit(new PutCallable(restartFuture)); + Future scanFuture = executorService.submit(new ScanCallable(restartFuture)); + Future loadFuture = executorService.submit(new LoadCallable()); + + restartFuture.get(); + loadFuture.get(); + + // Get the values from the futures. + long putTime = TimeUnit.MILLISECONDS.convert(putFuture.get(), TimeUnit.NANOSECONDS); + long scanTime = TimeUnit.MILLISECONDS.convert(scanFuture.get(), TimeUnit.NANOSECONDS); + + // Store the times to display later. + timesPutMs.add(putTime); + timesScanMs.add(scanTime); + + // Wait some time for everything to settle down. + Thread.sleep(5000l); + } + + long runtimeMs = TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS); + + Objects.ToStringHelper helper = Objects.toStringHelper("MTTRResults") + .add("timesPutMs", timesPutMs) + .add("timesScanMs", timesScanMs) + .add("totalRuntimeMs", runtimeMs); + + // Log the info + LOG.info(helper.toString()); + } + + @Before + public void setUp() throws Exception { + // Set up the integration test + this.util = getTestingUtil(null); + // Make sure there are three servers. + this.util.initializeCluster(3); + // Create access to the cluster to see if it's distributed. + this.cluster = util.getHBaseCluster(); + + // Create executor with enough threads to restart rs's, run scans, puts, and load test tool. + this.executorService = Executors.newFixedThreadPool(4); + + // Get the table name. + this.tableName = util.getConfiguration() + .get("hbase.IntegrationTestMTTR.tableName", "IntegrationTestMTTR"); + this.tableNameBytes = Bytes.toBytes(tableName); + + this.loadTableName = util.getConfiguration() + .get("hbase.IntegrationTestMTTR.loadTableName", "IntegrationTestMTTRLoadTestTool"); + + if (util.getHBaseAdmin().tableExists(tableNameBytes)) { + util.deleteTable(tableNameBytes); + } + // Create the table. If this fails then fail everything. + util.createTable(Bytes.toBytes(tableName), FAMILY); + + // Set up the action that will restart a region server holding a region from our table + // because this table should only have one region we should be good. + this.restartRSAction = new ChaosMonkey.RestartRsHoldingTable(60 * 1000l, tableName); + + // Give the action the access to the cluster. + restartRSAction.init(new ChaosMonkey.ActionContext(util)); + + // Set up the load test tool. + this.loadTool = new LoadTestTool(); + loadTool.setConf(util.getConfiguration()); + int ret = loadTool.run(new String[] { "-tn", loadTableName, "-init_only" }); + assertEquals("Failed to initialize LoadTestTool", 0, ret); + } + + @After + public void after() throws IOException { + // Clean everything up. + util.restoreCluster(); + util = null; + cluster = null; + executorService.shutdown(); + executorService = null; + restartRSAction = null; + } + + /** + * Create a testing util from a conf. + * @param conf the conf. Can be null. + * @return an IntegrationTestingUtility. Could be new or re-used. + */ + private IntegrationTestingUtility getTestingUtil(Configuration conf) { + if (this.util == null) { + if (conf == null) { + this.util = new IntegrationTestingUtility(); + } else { + this.util = new IntegrationTestingUtility(conf); + } + } + return util; + } + + /** + * Callable that will keep putting small amounts of data into a table + * until the future supplied returns. It keeps track of the max time. + */ + public class PutCallable implements Callable { + + private final Future future; + private final HTable table; + + public PutCallable(Future f) throws IOException { + this.future = f; + this.table = new HTable(util.getConfiguration(), tableName); + } + + @Override + public Long call() throws Exception { + long maxTime = 0; + int numAfterDone = 0; + // Keep trying until the rs is back up and we've gotten a put through + while (numAfterDone < 10) { + Put p = new Put(Bytes.toBytes(RandomStringUtils.randomAlphanumeric(5))); + p.add(FAMILY, Bytes.toBytes("\0"), Bytes.toBytes(RandomStringUtils.randomAscii(5))); + + // Time how long a put takes. + long start = System.nanoTime(); + try { + table.put(p); + table.flushCommits(); + if (future.isDone()) { + numAfterDone ++; + } + } catch (Exception e) { + numAfterDone = 0; + } + + long time = System.nanoTime() - start; + maxTime = Math.max(time, maxTime); + } + + return maxTime; + } + } + + /** + * Callable that will keep scanning for small amounts of data until the + * supplied future returns. Returns the max time taken to scan. + */ + public class ScanCallable implements Callable { + private final Future future; + private final HTable table; + + public ScanCallable(Future f) throws IOException { + this.future = f; + this.table = new HTable(util.getConfiguration(), tableName); + } + + @Override + public Long call() throws Exception { + long maxTime = 0; + int numAfterDone = 0; + // Keep trying until the rs is back up and we've gotten a put through + while (numAfterDone < 10) { + Scan s = new Scan(); + s.setBatch(2); + s.addFamily(FAMILY); + + ResultScanner rs = null; + // Time how long a put takes. + long start = System.nanoTime(); + try { + rs = table.getScanner(s); + if (rs != null && rs.next().size() != 0 && future.isDone()) { + numAfterDone ++; + } + } catch (Exception e) { + numAfterDone = 0; + } finally { + if (rs != null) { + rs.close(); + } + } + + long time = System.nanoTime() - start; + maxTime = Math.max(time, maxTime); + } + + return maxTime; + } + } + + /** + * Callable to restart the region server. + */ + public class RestartRSCallable implements Callable { + @Override + public Boolean call() throws Exception { + restartRSAction.perform(); + return true; + } + } + + /** + * Callable used to make sure the cluster has some load on it. + */ + public class LoadCallable implements Callable { + + @Override + public Boolean call() throws Exception { + int colsPerKey = 10; + int recordSize = 500; + int numKeys = cluster.getInitialClusterStatus().getServersSize() * 5000; + int writeThreads = 10; + + int ret = loadTool.run(new String[] { + "-tn", loadTableName, + "-write", String.format("%d:%d:%d", colsPerKey, recordSize, writeThreads), + "-num_keys", String.valueOf(numKeys), + "-skip_init" + }); + assertEquals("Load failed", 0, ret); + + ret = loadTool.run(new String[] { + "-tn", loadTableName, + "-read", "100:10", + "-num_keys", String.valueOf(numKeys), + "-skip_init" + }); + assertEquals("Verify failed", 0, ret); + return true; + } + } +} diff --git hbase-it/src/test/java/org/apache/hadoop/hbase/util/ChaosMonkey.java hbase-it/src/test/java/org/apache/hadoop/hbase/util/ChaosMonkey.java index aa252ee..b95221c 100644 --- hbase-it/src/test/java/org/apache/hadoop/hbase/util/ChaosMonkey.java +++ hbase-it/src/test/java/org/apache/hadoop/hbase/util/ChaosMonkey.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.ServerLoad; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.ToolRunner; @@ -110,10 +111,10 @@ public class ChaosMonkey extends AbstractHBaseTool implements Stoppable { /** * Context for Action's */ - private static class ActionContext { + public static class ActionContext { private IntegrationTestingUtility util; - ActionContext(IntegrationTestingUtility util) { + public ActionContext(IntegrationTestingUtility util) { this.util = util; } @@ -141,7 +142,7 @@ public class ChaosMonkey extends AbstractHBaseTool implements Stoppable { protected ServerName[] initialServers; protected Random random = new Random(); - void init(ActionContext context) throws Exception { + public void init(ActionContext context) throws Exception { this.context = context; cluster = context.getHBaseCluster(); initialStatus = cluster.getInitialClusterStatus(); @@ -149,7 +150,7 @@ public class ChaosMonkey extends AbstractHBaseTool implements Stoppable { initialServers = regionServers.toArray(new ServerName[regionServers.size()]); } - protected void perform() throws Exception { }; + public void perform() throws Exception { }; // TODO: perhaps these methods should be elsewhere? /** Returns current region servers */ @@ -254,7 +255,7 @@ public class ChaosMonkey extends AbstractHBaseTool implements Stoppable { super(sleepTime); } @Override - protected void perform() throws Exception { + public void perform() throws Exception { LOG.info("Performing action: Restart active master"); ServerName master = cluster.getClusterStatus().getMaster(); @@ -268,7 +269,7 @@ public class ChaosMonkey extends AbstractHBaseTool implements Stoppable { } @Override - protected void perform() throws Exception { + public void perform() throws Exception { LOG.info("Performing action: Restart random region server"); ServerName server = selectRandomItem(getCurrentServers()); @@ -276,12 +277,12 @@ public class ChaosMonkey extends AbstractHBaseTool implements Stoppable { } } - public static class RestartRsHoldingMeta extends RestartRandomRs { + public static class RestartRsHoldingMeta extends RestartActionBase { public RestartRsHoldingMeta(long sleepTime) { super(sleepTime); } @Override - protected void perform() throws Exception { + public void perform() throws Exception { LOG.info("Performing action: Restart region server holding META"); ServerName server = cluster.getServerHoldingMeta(); if (server == null) { @@ -292,6 +293,32 @@ public class ChaosMonkey extends AbstractHBaseTool implements Stoppable { } } + public static class RestartRsHoldingTable extends RestartActionBase { + + private final String tableName; + + public RestartRsHoldingTable(long sleepTime, String tableName) { + super(sleepTime); + this.tableName = tableName; + } + + @Override + public void perform() throws Exception { + HTable table = null; + try { + table = new HTable(context.getHaseIntegrationTestingUtility().getConfiguration(), tableName); + } catch (Exception e) { + return; + } + + Collection serverNames = table.getRegionLocations().values(); + ServerName[] nameArray = serverNames.toArray(new ServerName[serverNames.size()]); + + restartRs(nameArray[random.nextInt(nameArray.length)], sleepTime); + } + + + } /** * Restarts a ratio of the running regionservers at the same time */ @@ -304,7 +331,7 @@ public class ChaosMonkey extends AbstractHBaseTool implements Stoppable { } @Override - protected void perform() throws Exception { + public void perform() throws Exception { LOG.info(String.format("Performing action: Batch restarting %d%% of region servers", (int)(ratio * 100))); List selectedServers = selectRandomItems(getCurrentServers(), ratio); @@ -346,7 +373,7 @@ public class ChaosMonkey extends AbstractHBaseTool implements Stoppable { } @Override - protected void perform() throws Exception { + public void perform() throws Exception { LOG.info(String.format("Performing action: Rolling batch restarting %d%% of region servers", (int)(ratio * 100))); List selectedServers = selectRandomItems(getCurrentServers(), ratio); @@ -394,7 +421,7 @@ public class ChaosMonkey extends AbstractHBaseTool implements Stoppable { } @Override - protected void perform() throws Exception { + public void perform() throws Exception { LOG.info("Unbalancing regions"); ClusterStatus status = this.cluster.getClusterStatus(); List victimServers = new LinkedList(status.getServers()); @@ -410,7 +437,7 @@ public class ChaosMonkey extends AbstractHBaseTool implements Stoppable { public static class ForceBalancerAction extends Action { @Override - protected void perform() throws Exception { + public void perform() throws Exception { LOG.info("Balancing regions"); forceBalancer(); } -- 1.7.10.2 (Apple Git-33)