diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestMasterFailover.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestMasterFailover.java new file mode 100644 index 0000000..7c92aa1 --- /dev/null +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestMasterFailover.java @@ -0,0 +1,583 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.commons.lang.RandomStringUtils; +import org.apache.commons.lang.math.RandomUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.testclassification.IntegrationTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.HBaseFsck; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.util.ToolRunner; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(IntegrationTests.class) public class IntegrationTestMasterFailover + extends IntegrationTestBase { + + private static final Log LOG = LogFactory.getLog(IntegrationTestMasterFailover.class); + + private static final int SERVER_COUNT = 1; // number of slaves for the smallest cluster + + protected static final long DEFAULT_RUN_TIME = 20 * 60 * 1000; + + protected static final int DEFAULT_NUM_THREADS = 20; + + protected static final int DEFAULT_NUM_REGIONS = 50; // number of regions in pre-split tables + + protected HBaseCluster cluster; + + protected Connection connection; + + /** + * A soft limit on how long we should run + */ + protected static final String RUN_TIME_KEY = "hbase.%s.runtime"; + protected static final String NUM_THREADS_KEY = "hbase.%s.numThreads"; + protected static final String NUM_REGIONS_KEY = "hbase.%s.numRegions"; + + protected AtomicBoolean running = new AtomicBoolean(true); + + protected int numThreads, numRegions; + + ConcurrentHashMap enabledTables = + new ConcurrentHashMap(); + + ConcurrentHashMap disabledTables = + new ConcurrentHashMap(); + + ConcurrentHashMap deletedTables = + new ConcurrentHashMap(); + + @Override public void setUpCluster() throws Exception { + util = getTestingUtil(getConf()); + LOG.debug("Initializing/checking cluster has " + SERVER_COUNT + " servers"); + util.initializeCluster(getMinServerCount()); + LOG.debug("Done initializing/checking cluster"); + cluster = util.getHBaseClusterInterface(); + } + + @Override public void setUpMonkey() throws Exception { + this.monkeyToUse = "masterKillingMonkey"; + //super.setUpMonkey(); + } + + @Override public void cleanUpCluster() throws Exception { + Admin admin = util.getHBaseAdmin(); + admin.disableTables("ittable-\\d+"); + admin.deleteTables("ittable-\\d+"); + super.cleanUpCluster(); + } + + protected int getMinServerCount() { + return SERVER_COUNT; + } + + protected synchronized void setConnection(Connection connection){ + this.connection = connection; + } + + protected synchronized Connection getConnection(){ + if (this.connection == null) { + try { + Connection connection = ConnectionFactory.createConnection(getConf()); + setConnection(connection); + } catch (IOException e) { + LOG.fatal("Failed to establish connection.", e); + } + } + return connection; + } + + protected void verifyTables() throws IOException{ + Connection connection = getConnection(); + Admin admin = connection.getAdmin(); + // iterating concurrent map + for(TableName tableName : enabledTables.keySet()){ + Assert.assertTrue("Table: " + tableName + " in enabledTables is not enabled", + admin.isTableEnabled(tableName)); + } + for(TableName tableName : disabledTables.keySet()){ + Assert.assertTrue("Table: " + tableName + " in disabledTables is not disabled", + admin.isTableDisabled(tableName)); + } + for(TableName tableName : deletedTables.keySet()){ + Assert.assertFalse("Table: " + tableName + " in deletedTables is not deleted", + admin.tableExists(tableName)); + } + admin.close(); + } + + @Test public void testAsUnitTest() throws Exception { + runTest(); + } + + @Override public int runTestFromCommandLine() throws Exception { + int ret = runTest(); + return ret; + } + + private abstract class MasterAction{ + Connection connection = getConnection(); + + abstract void perform() throws IOException; + } + + private abstract class TableAction extends MasterAction{ + // TableAction has implemented selecTable() shared by multiple table Actions + protected HTableDescriptor selectTable(ConcurrentHashMap tableMap) + { + // randomly select table from tableMap + if (tableMap.isEmpty()){ + return null; + } + // synchronization to prevent removal from multiple threads + synchronized (tableMap){ + ArrayList tableList = new ArrayList(tableMap.keySet()); + TableName randomKey = tableList.get(RandomUtils.nextInt(tableList.size())); + HTableDescriptor randomHtd = tableMap.get(randomKey); + // remove from tableMap + tableMap.remove(randomKey); + return randomHtd; + } + } + } + + private class CreateTableAction extends TableAction { + + @Override void perform() throws IOException { + Admin admin = connection.getAdmin(); + try { + HTableDescriptor htd = createTableDesc(); + TableName tableName = htd.getTableName(); + if( admin.tableExists(tableName)){ + return; + } + String numRegionKey = String.format(NUM_REGIONS_KEY, this.getClass().getSimpleName()); + numRegions = getConf().getInt(numRegionKey, DEFAULT_NUM_REGIONS); + byte[] startKey = Bytes.toBytes("row-0000000000"); + byte[] endKey = Bytes.toBytes("row-" + Integer.MAX_VALUE); + LOG.info("Creating table:" + htd); + admin.createTable(htd, startKey, endKey, numRegions); + Assert.assertTrue("Table: " + htd + " was not created", admin.tableExists(tableName)); + enabledTables.put(tableName, htd); + LOG.info("Created table:" + htd); + } finally { + admin.close(); + } + verifyTables(); + } + + private HTableDescriptor createTableDesc() { + String tableName = "ittable-" + Math.abs(RandomUtils.nextInt()); + String familyName = "cf-" + Math.abs(RandomUtils.nextInt()); + HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName)); + // add random column family + htd.addFamily(new HColumnDescriptor(familyName)); + return htd; + } + } + + private class DisableTableAction extends TableAction { + + @Override void perform() throws IOException { + + HTableDescriptor selected = selectTable(enabledTables); + if (selected == null) { + return; + } + + Admin admin = connection.getAdmin(); + try { + TableName tableName = selected.getTableName(); + LOG.info("Disabling table :" + selected); + admin.disableTable(tableName); + Assert.assertTrue("Table: " + selected + " was not disabled", + admin.isTableDisabled(tableName)); + disabledTables.put(tableName, selected); + LOG.info("Disabled table :" + selected); + } finally { + admin.close(); + } + verifyTables(); + } + } + + private class EnableTableAction extends TableAction { + + @Override void perform() throws IOException { + + HTableDescriptor selected = selectTable(disabledTables); + if (selected == null ) { + return; + } + + Admin admin = connection.getAdmin(); + try { + TableName tableName = selected.getTableName(); + LOG.info("Enabling table :" + selected); + admin.enableTable(tableName); + Assert.assertTrue("Table: " + selected + " was not enabled", + admin.isTableEnabled(tableName)); + enabledTables.put(tableName, selected); + LOG.info("Enabled table :" + selected); + } finally { + admin.close(); + } + verifyTables(); + } + } + + private class DeleteTableAction extends TableAction { + + @Override void perform() throws IOException { + + HTableDescriptor selected = selectTable(disabledTables); + if (selected == null) { + return; + } + + Admin admin = connection.getAdmin(); + try { + TableName tableName = selected.getTableName(); + LOG.info("Deleting table :" + selected); + admin.deleteTable(tableName); + Assert.assertFalse("Table: " + selected + " was not deleted", + admin.tableExists(tableName)); + deletedTables.put(tableName, selected); + LOG.info("Deleted table :" + selected); + } finally { + admin.close(); + } + verifyTables(); + } + } + + + private abstract class ColumnAction extends TableAction{ + // ColumnAction has implemented selecFamily() shared by multiple family Actions + protected HColumnDescriptor selectFamily(HTableDescriptor htd) { + if (htd == null) { + return null; + } + HColumnDescriptor[] families = htd.getColumnFamilies(); + if (families.length == 0){ + LOG.info("No column families in table: " + htd); + return null; + } + HColumnDescriptor randomCfd = families[RandomUtils.nextInt(families.length)]; + return randomCfd; + } + } + + private class AddColumnFamilyAction extends ColumnAction { + + @Override void perform() throws IOException { + HTableDescriptor selected = selectTable(disabledTables); + if (selected == null) { + return; + } + + Admin admin = connection.getAdmin(); + try { + HColumnDescriptor cfd = createFamilyDesc(); + TableName tableName = selected.getTableName(); + LOG.info("Adding column family: " + cfd + " to table: " + tableName); + admin.addColumn(tableName, cfd); + Assert.assertTrue("Column family: " + cfd + " was not added", + selected.hasFamily(cfd.getName())); + LOG.info("Added column family: " + cfd + " to table: " + tableName); + disabledTables.put(selected.getTableName(), selected); + } finally { + admin.close(); + } + verifyTables(); + } + + private HColumnDescriptor createFamilyDesc() { + String familyName = "cf-" + Math.abs(RandomUtils.nextLong()); + HColumnDescriptor cfd = new HColumnDescriptor(familyName); + return cfd; + } + } + + private class AlterFamilyVersionsAction extends ColumnAction { + + @Override void perform() throws IOException { + HTableDescriptor selected = selectTable(disabledTables); + if (selected == null) { + return; + } + HColumnDescriptor columnDesc = selectFamily(selected); + if (columnDesc == null){ + return; + } + + Admin admin = connection.getAdmin(); + int versions = RandomUtils.nextInt(5) + 3; + try { + TableName tableName = selected.getTableName(); + LOG.info("Altering versions of column family: " + columnDesc + " to: " + versions + + " in table: " + tableName); + columnDesc.setMaxVersions(versions); + columnDesc.setMinVersions(versions); + admin.modifyTable(tableName, selected); + Assert.assertEquals("Column family: " + columnDesc + " was not altered", + selected.getFamily(columnDesc.getName()).getMaxVersions(), versions); + Assert.assertEquals("Column family: " + columnDesc + " was not altered", + selected.getFamily(columnDesc.getName()).getMinVersions(), versions); + LOG.info("Altered versions of column family: " + columnDesc + " to: " + versions + + " in table: " + tableName); + disabledTables.put(selected.getTableName(), selected); + } finally { + admin.close(); + } + verifyTables(); + } + } + + private class AlterFamilyEncodingAction extends ColumnAction { + + @Override void perform() throws IOException { + HTableDescriptor selected = selectTable(disabledTables); + if (selected == null) { + return; + } + HColumnDescriptor columnDesc = selectFamily(selected); + if (columnDesc == null){ + return; + } + + Admin admin = connection.getAdmin(); + try { + TableName tableName = selected.getTableName(); + // possible DataBlockEncoding ids + int[] possibleIds = {0, 2, 3, 4, 6}; + short id = (short) possibleIds[RandomUtils.nextInt(possibleIds.length)]; + LOG.info("Altering encoding of column family: " + columnDesc + " to: " + id + + " in table: " + tableName); + columnDesc.setDataBlockEncoding(DataBlockEncoding.getEncodingById(id)); + admin.modifyTable(tableName, selected); + Assert.assertEquals("Encoding of column family: " + columnDesc + " was not altered", + selected.getFamily(columnDesc.getName()).getDataBlockEncoding().getId(), id); + LOG.info("Altered encoding of column family: " + columnDesc + " to: " + id + + " in table: " + tableName); + disabledTables.put(selected.getTableName(), selected); + } finally { + admin.close(); + } + verifyTables(); + } + } + + private class DeleteColumnFamilyAction extends ColumnAction { + + @Override void perform() throws IOException { + HTableDescriptor selected = selectTable(disabledTables); + HColumnDescriptor cfd = selectFamily(selected); + if (selected == null || cfd == null) { + return; + } + + Admin admin = connection.getAdmin(); + try { + TableName tableName = selected.getTableName(); + LOG.info("Deleting column family: " + cfd + " from table: " + tableName); + admin.deleteColumn(tableName, cfd.getName()); + Assert.assertFalse("Column family: " + cfd + " was not added", + selected.hasFamily(cfd.getName())); + LOG.info("Deleted column family: " + cfd + " from table: " + tableName); + disabledTables.put(selected.getTableName(), selected); + } finally { + admin.close(); + } + verifyTables(); + } + } + + private class AddRowAction extends ColumnAction { + // populate tables for hbck + @Override void perform() throws IOException { + HTableDescriptor selected = selectTable(enabledTables); + if (selected == null ) { + return; + } + + Admin admin = connection.getAdmin(); + TableName tableName = selected.getTableName(); + try (Table table = connection.getTable(tableName)){ + ArrayList regionInfos = new ArrayList(admin.getTableRegions( + selected.getTableName())); + int numRegions = regionInfos.size(); + int average_rows = 5; + int numRows = average_rows * numRegions; + // add ${average_rows} rows to each region on average + LOG.info("Adding " + numRows + "rows to table: " + selected); + for(int i = 0; i < numRows; i++){ + // nextInt(Integer.MAX_VALUE)) to return positive numbers only + byte[] rowKey = Bytes.toBytes( + "row-" + String.format("%010d", RandomUtils.nextInt(Integer.MAX_VALUE))); + HColumnDescriptor cfd = selectFamily(selected); + if (cfd == null){ + return; + } + byte[] family = cfd.getName(); + byte[] qualifier = Bytes.toBytes("col-" + RandomUtils.nextInt(Integer.MAX_VALUE) % 10); + byte[] value = Bytes.toBytes("val-" + RandomStringUtils.randomAlphanumeric(10)); + Put put = new Put(rowKey); + put.addColumn(family, qualifier, value); + String msg = String.format("Putting '%s', '%s', '%s:%s', '%s'", + tableName, rowKey, family, qualifier, value); + LOG.debug(msg); + table.put(put); + } + enabledTables.put(selected.getTableName(), selected); + LOG.info("Added " + numRows + "rows to table: " + selected); + } finally { + admin.close(); + } + verifyTables(); + } + } + + private enum ACTION { + CREATE_TABLE, + DISABLE_TABLE, + ENABLE_TABLE, + DELETE_TABLE, + ADD_COLUMNFAMILY, + DELETE_COLUMNFAMILY, + ALTER_FAMILYVERSIONS, + ALTER_FAMILYENCODING, + ADD_ROW + } + + private class Worker extends Thread { + + private Exception savedException; + + @Override public void run() { + while (running.get()) { + // select random action + ACTION selectedAction = ACTION.values()[RandomUtils.nextInt() % ACTION.values().length]; + LOG.info("Performing Action: " + selectedAction); + + try { + switch (selectedAction) { + case CREATE_TABLE: + new CreateTableAction().perform(); + case DISABLE_TABLE: + new DisableTableAction().perform(); + case ENABLE_TABLE: + new EnableTableAction().perform(); + case DELETE_TABLE: + new DeleteTableAction().perform(); + case ADD_COLUMNFAMILY: + new AddColumnFamilyAction().perform(); + case DELETE_COLUMNFAMILY: + new DeleteColumnFamilyAction().perform(); + case ALTER_FAMILYVERSIONS: + new AlterFamilyVersionsAction().perform(); + case ALTER_FAMILYENCODING: + new AlterFamilyEncodingAction().perform(); + case ADD_ROW: + new AddRowAction().perform(); + } + } catch (Exception ex) { + this.savedException = ex; + return; + } + } + } + } + + private int runTest() throws Exception { + LOG.info("Starting the test"); + + String runtimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName()); + long runtime = util.getConfiguration().getLong(runtimeKey, DEFAULT_RUN_TIME); + + String numThreadKey = String.format(NUM_THREADS_KEY, this.getClass().getSimpleName()); + numThreads = util.getConfiguration().getInt(numThreadKey, DEFAULT_NUM_THREADS); + + ArrayList workers = new ArrayList<>(); + for (int i = 0; i < numThreads; i++) { + Worker worker = new Worker(); + LOG.info("Launching worker thread " + worker.getName()); + workers.add(worker); + worker.start(); + } + + Threads.sleep(runtime); + LOG.info("Runtime is up"); + running.set(false); + + for (Worker worker : workers) { + worker.join(); + } + + // verify + verifyTables(); + + // RUN HBCK + LOG.info("Running hbck"); + HBaseFsck hbck = new HBaseFsck(util.getConfiguration()); + int ret = hbck.onlineHbck(); + Assert.assertEquals("hbck failed", 0, ret); + return ret; + } + + @Override public TableName getTablename() { + return null; + } + + @Override protected Set getColumnFamilies() { + return null; + } + + public static void main(String[] args) throws Exception { + Configuration conf = HBaseConfiguration.create(); + IntegrationTestingUtility.setUseDistributedCluster(conf); + IntegrationTestMasterFailover masterFailover = new IntegrationTestMasterFailover(); + Connection connection; + try{ + // Initialize connection once, then pass to Actions + LOG.debug("Setting up connection ..."); + connection = ConnectionFactory.createConnection(conf); + masterFailover.setConnection(connection); + int ret = ToolRunner.run(conf, masterFailover, args); + connection = masterFailover.getConnection(); + connection.close(); + System.exit(ret); + }catch (IOException e){ + LOG.fatal("Failed to establish connection. Aborting test ...", e); + } + } +} \ No newline at end of file diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/factories/MasterKillingMonkeyFactory.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/factories/MasterKillingMonkeyFactory.java new file mode 100644 index 0000000..e7bfa34 --- /dev/null +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/factories/MasterKillingMonkeyFactory.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.chaos.factories; + +import org.apache.hadoop.hbase.chaos.actions.Action; +import org.apache.hadoop.hbase.chaos.actions.DumpClusterStatusAction; +import org.apache.hadoop.hbase.chaos.actions.RestartActiveMasterAction; +import org.apache.hadoop.hbase.chaos.monkies.ChaosMonkey; +import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey; +import org.apache.hadoop.hbase.chaos.policies.PeriodicRandomActionPolicy; + +public class MasterKillingMonkeyFactory extends MonkeyFactory { + + private long action1Period; + private long action2Period; + + private long restartActiveMasterSleepTime; + + @Override + public ChaosMonkey build() { + loadProperties(); + + // Destructive actions to mess things around. + Action[] actions1 = new Action[] { + new RestartActiveMasterAction(restartActiveMasterSleepTime), + }; + + // Action to log more info for debugging + Action[] actions2 = new Action[] { + new DumpClusterStatusAction() + }; + + return new PolicyBasedChaosMonkey(util, + new PeriodicRandomActionPolicy(action1Period, actions1), + new PeriodicRandomActionPolicy(action2Period, actions2)); + } + + private void loadProperties() { + + action1Period = Long.parseLong(this.properties.getProperty( + MonkeyConstants.PERIODIC_ACTION1_PERIOD, + MonkeyConstants.DEFAULT_PERIODIC_ACTION1_PERIOD + "")); + action2Period = Long.parseLong(this.properties.getProperty( + MonkeyConstants.PERIODIC_ACTION2_PERIOD, + MonkeyConstants.DEFAULT_PERIODIC_ACTION2_PERIOD + "")); + restartActiveMasterSleepTime = Long.parseLong(this.properties.getProperty( + MonkeyConstants.RESTART_ACTIVE_MASTER_SLEEP_TIME, + MonkeyConstants.DEFAULT_RESTART_ACTIVE_MASTER_SLEEP_TIME + "")); + } + +} diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/factories/MonkeyFactory.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/factories/MonkeyFactory.java index f4b1c53..c71339c 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/factories/MonkeyFactory.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/factories/MonkeyFactory.java @@ -69,6 +69,7 @@ public abstract class MonkeyFactory { public static final String SERVER_KILLING = "serverKilling"; public static final String STRESS_AM = "stressAM"; public static final String NO_KILL = "noKill"; + public static final String MASTER_MONKEY = "masterKillingMonkey"; public static Map FACTORIES = ImmutableMap.builder() .put(CALM, new CalmMonkeyFactory()) @@ -77,6 +78,7 @@ public abstract class MonkeyFactory { .put(SERVER_KILLING, new ServerKillingMonkeyFactory()) .put(STRESS_AM, new StressAssignmentManagerMonkeyFactory()) .put(NO_KILL, new NoKillMonkeyFactory()) + .put(MASTER_MONKEY, new MasterKillingMonkeyFactory()) .build(); public static MonkeyFactory getFactory(String factoryName) {