diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionBalanceAfterRSRestart.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionBalanceAfterRSRestart.java new file mode 100644 index 0000000..6e67e1d --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionBalanceAfterRSRestart.java @@ -0,0 +1,465 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.NavigableSet; +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.SplitLogCounters; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; +import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; +import org.apache.hadoop.hbase.zookeeper.ZKAssign; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.zookeeper.KeeperException; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/* + * This test verifies that a newly started region server gets assigned regions + * originally assigned to existing servers + */ +@Category(LargeTests.class) +public class TestRegionBalanceAfterRSRestart { + private static final Log LOG = LogFactory.getLog(TestRegionBalanceAfterRSRestart.class); + static { + Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG); + } + + // Start a cluster with 1 master and 4 regionservers + static final int NUM_MASTERS = 1; + static final int NUM_RS = 2; + final int NUM_REGIONS_TO_CREATE = 40; + + MiniHBaseCluster cluster; + HMaster master; + Configuration conf; + static Configuration originalConf; + static HBaseTestingUtility TEST_UTIL; + static MiniDFSCluster dfsCluster; + static MiniZooKeeperCluster zkCluster; + + @BeforeClass + public static void setup() throws Exception { + TEST_UTIL = new HBaseTestingUtility(HBaseConfiguration.create()); + dfsCluster = TEST_UTIL.startMiniDFSCluster(1); + zkCluster = TEST_UTIL.startMiniZKCluster(); + originalConf = TEST_UTIL.getConfiguration(); + } + + @AfterClass + public static void tearDown() throws IOException { + TEST_UTIL.shutdownMiniZKCluster(); + TEST_UTIL.shutdownMiniDFSCluster(); + } + + private void startCluster(int num_rs) throws Exception { + SplitLogCounters.resetCounters(); + LOG.info("Starting cluster"); + conf.getLong("hbase.splitlog.max.resubmit", 0); + // Make the failure test faster + conf.setInt("zookeeper.recovery.retry", 0); + conf.setInt("hbase.balancer.period", 3000); + conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1); + conf.setInt("hbase.regionserver.wal.max.splitters", 3); + conf.setInt("hfile.format.version", 3); + TEST_UTIL = new HBaseTestingUtility(conf); + TEST_UTIL.setDFSCluster(dfsCluster); + TEST_UTIL.setZkCluster(zkCluster); + TEST_UTIL.startMiniHBaseCluster(NUM_MASTERS, num_rs); + cluster = TEST_UTIL.getHBaseCluster(); + LOG.info("Waiting for active/ready master"); + cluster.waitForActiveAndReadyMaster(); + master = cluster.getMaster(); + while (cluster.getLiveRegionServerThreads().size() < num_rs) { + Threads.sleep(1); + } + } + + @Before + public void before() throws Exception { + // refresh configuration + conf = HBaseConfiguration.create(originalConf); + } + + @After + public void after() throws Exception { + try { + if (TEST_UTIL.getHBaseCluster() != null) { + for (MasterThread mt : TEST_UTIL.getHBaseCluster().getLiveMasterThreads()) { + mt.getMaster().abort("closing...", null); + } + } + TEST_UTIL.shutdownMiniHBaseCluster(); + } finally { + TEST_UTIL.getTestFileSystem().delete(FSUtils.getRootDir(TEST_UTIL.getConfiguration()), true); + ZKUtil.deleteNodeRecursively(TEST_UTIL.getZooKeeperWatcher(), "/hbase"); + } + } + + @Test(timeout = 600000) + public void testRegionbalanceAfterMetaRSDown() throws Exception { + LOG.info("testRecoveredEditsReplayWithMetaRSDown"); + conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); + startCluster(NUM_RS); + final int NUM_LOG_LINES = 40; + final String tableName = "table"; + + boolean b = master.balanceSwitch(true); + assertTrue("Load balancer should be enabled", b); + final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null); + HTable ht = installTable(zkw, tableName, "family", NUM_REGIONS_TO_CREATE); + + verifyBalancedRegions(false); + HRegionServer hrs = findRSToKill(true, tableName); + List regions = ProtobufUtil.getOnlineRegions(hrs); + makeHLog(hrs.getWAL(), regions, tableName, "family", NUM_LOG_LINES, 100); + + this.abortRSAndVerifyRecovery(hrs, ht, zkw, NUM_REGIONS_TO_CREATE, NUM_LOG_LINES); + cluster.startRegionServer(); + verifyBalancedRegions(true); + + ht.close(); + zkw.close(); + } + + private void verifyBalancedRegions(boolean withNewServer) throws IOException { + List threads = cluster.getRegionServerThreads(); + int cnt = 0; + String tag = withNewServer ? " with new server" : " without new server"; + while (true) { + boolean balanced = true; + if (++cnt % 10 == 0) { + LOG.info("In " + cnt + "th iteration" + tag); + } + for (RegionServerThread thread : threads) { + HRegionServer rs = thread.getRegionServer(); + if (rs.isOnline() && !rs.isStopped()) { + List regions = ProtobufUtil.getOnlineRegions(thread.getRegionServer()); + // see if some regions are moved to the newly started server + if (regions.size() < NUM_REGIONS_TO_CREATE/2-4) { + balanced = false; + } + } + Threads.sleep(100); + } + if (balanced) break; + } + } + + private void abortRSAndVerifyRecovery(HRegionServer hrs, HTable ht, final ZooKeeperWatcher zkw, + final int numRegions, final int numofLines) throws Exception { + + abortRSAndWaitForRecovery(hrs, zkw, numRegions); + assertEquals(numofLines, TEST_UTIL.countRows(ht)); + } + + private void abortRSAndWaitForRecovery(HRegionServer hrs, final ZooKeeperWatcher zkw, + final int numRegions) throws Exception { + final MiniHBaseCluster tmpCluster = this.cluster; + + // abort RS + LOG.info("Aborting region server: " + hrs.getServerName()); + hrs.abort("testing"); + + // wait for abort completes + TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return (tmpCluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1)); + } + }); + + // wait for regions come online + TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return (getAllOnlineRegions(tmpCluster).size() >= (numRegions + 1)); + } + }); + + // wait for all regions are fully recovered + TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + List recoveringRegions = zkw.getRecoverableZooKeeper().getChildren( + zkw.recoveringRegionsZNode, false); + return (recoveringRegions != null && recoveringRegions.size() == 0); + } + }); + } + + HTable installTable(ZooKeeperWatcher zkw, String tname, String fname, int nrs) throws Exception { + return installTable(zkw, tname, fname, nrs, 0); + } + + HTable installTable(ZooKeeperWatcher zkw, String tname, String fname, int nrs, + int existingRegions) throws Exception { + // Create a table with regions + byte [] table = Bytes.toBytes(tname); + byte [] family = Bytes.toBytes(fname); + LOG.info("Creating table with " + nrs + " regions"); + HTable ht = TEST_UTIL.createTable(table, family); + int numRegions = TEST_UTIL.createMultiRegions(conf, ht, family, nrs); + assertEquals(nrs, numRegions); + LOG.info("Waiting for no more RIT\n"); + blockUntilNoRIT(zkw, master); + // disable-enable cycle to get rid of table's dead regions left behind + // by createMultiRegions + LOG.debug("Disabling table\n"); + TEST_UTIL.getHBaseAdmin().disableTable(table); + LOG.debug("Waiting for no more RIT\n"); + blockUntilNoRIT(zkw, master); + NavigableSet regions = getAllOnlineRegions(cluster); + LOG.debug("Verifying only catalog and namespace regions are assigned\n"); + if (regions.size() != 2) { + for (String oregion : regions) + LOG.debug("Region still online: " + oregion); + } + assertEquals(2 + existingRegions, regions.size()); + LOG.debug("Enabling table\n"); + TEST_UTIL.getHBaseAdmin().enableTable(table); + LOG.debug("Waiting for no more RIT\n"); + blockUntilNoRIT(zkw, master); + LOG.debug("Verifying there are " + numRegions + " assigned on cluster\n"); + regions = getAllOnlineRegions(cluster); + assertEquals(numRegions + 2 + existingRegions, regions.size()); + return ht; + } + + void populateDataInTable(int nrows, String fname) throws Exception { + byte [] family = Bytes.toBytes(fname); + + List rsts = cluster.getLiveRegionServerThreads(); + assertEquals(NUM_RS, rsts.size()); + + for (RegionServerThread rst : rsts) { + HRegionServer hrs = rst.getRegionServer(); + List hris = ProtobufUtil.getOnlineRegions(hrs); + for (HRegionInfo hri : hris) { + if (hri.getTable().isSystemTable()) { + continue; + } + LOG.debug("adding data to rs = " + rst.getName() + + " region = "+ hri.getRegionNameAsString()); + HRegion region = hrs.getOnlineRegion(hri.getRegionName()); + assertTrue(region != null); + putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), family); + } + } + } + + public void makeHLog(HLog log, List regions, String tname, String fname, + int num_edits, int edit_size) throws IOException { + makeHLog(log, regions, tname, fname, num_edits, edit_size, true); + } + + public void makeHLog(HLog log, List regions, String tname, String fname, + int num_edits, int edit_size, boolean closeLog) throws IOException { + TableName fullTName = TableName.valueOf(tname); + // remove root and meta region + regions.remove(HRegionInfo.FIRST_META_REGIONINFO); + // using one sequenceId for edits across all regions is ok. + final AtomicLong sequenceId = new AtomicLong(10); + + + for(Iterator iter = regions.iterator(); iter.hasNext(); ) { + HRegionInfo regionInfo = iter.next(); + if(regionInfo.getTable().isSystemTable()) { + iter.remove(); + } + } + HTableDescriptor htd = new HTableDescriptor(fullTName); + byte[] family = Bytes.toBytes(fname); + htd.addFamily(new HColumnDescriptor(family)); + byte[] value = new byte[edit_size]; + + List hris = new ArrayList(); + for (HRegionInfo region : regions) { + if (!region.getTable().getNameAsString().equalsIgnoreCase(tname)) { + continue; + } + hris.add(region); + } + LOG.info("Creating wal edits across " + hris.size() + " regions."); + for (int i = 0; i < edit_size; i++) { + value[i] = (byte) ('a' + (i % 26)); + } + int n = hris.size(); + int[] counts = new int[n]; + if (n > 0) { + for (int i = 0; i < num_edits; i += 1) { + WALEdit e = new WALEdit(); + HRegionInfo curRegionInfo = hris.get(i % n); + byte[] startRow = curRegionInfo.getStartKey(); + if (startRow == null || startRow.length == 0) { + startRow = new byte[] { 0, 0, 0, 0, 1 }; + } + byte[] row = Bytes.incrementBytes(startRow, counts[i % n]); + row = Arrays.copyOfRange(row, 3, 8); // use last 5 bytes because + // HBaseTestingUtility.createMultiRegions use 5 bytes + // key + byte[] qualifier = Bytes.toBytes("c" + Integer.toString(i)); + e.add(new KeyValue(row, family, qualifier, System.currentTimeMillis(), value)); + log.append(curRegionInfo, fullTName, e, System.currentTimeMillis(), htd, sequenceId); + counts[i % n] += 1; + } + } + log.sync(); + if(closeLog) { + log.close(); + } + for (int i = 0; i < n; i++) { + LOG.info("region " + hris.get(i).getRegionNameAsString() + " has " + counts[i] + " edits"); + } + return; + } + + private void blockUntilNoRIT(ZooKeeperWatcher zkw, HMaster master) + throws KeeperException, InterruptedException { + ZKAssign.blockUntilNoRIT(zkw); + master.assignmentManager.waitUntilNoRegionsInTransition(60000); + } + + private void putData(HRegion region, byte[] startRow, int numRows, byte [] qf, + byte [] ...families) + throws IOException { + for(int i = 0; i < numRows; i++) { + Put put = new Put(Bytes.add(startRow, Bytes.toBytes(i))); + for(byte [] family : families) { + put.add(family, qf, null); + } + region.put(put); + } + } + + private NavigableSet getAllOnlineRegions(MiniHBaseCluster cluster) + throws IOException { + NavigableSet online = new TreeSet(); + for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) { + for (HRegionInfo region : ProtobufUtil.getOnlineRegions(rst.getRegionServer())) { + online.add(region.getRegionNameAsString()); + } + } + return online; + } + + /** + * Find a RS that has regions of a table. + * @param hasMetaRegion when true, the returned RS has hbase:meta region as well + * @param tableName + * @return + * @throws Exception + */ + private HRegionServer findRSToKill(boolean hasMetaRegion, String tableName) throws Exception { + List rsts = cluster.getLiveRegionServerThreads(); + int numOfRSs = rsts.size(); + List regions = null; + HRegionServer hrs = null; + + for (int i = 0; i < numOfRSs; i++) { + boolean isCarryingMeta = false; + boolean foundTableRegion = false; + hrs = rsts.get(i).getRegionServer(); + regions = ProtobufUtil.getOnlineRegions(hrs); + for (HRegionInfo region : regions) { + if (region.isMetaRegion()) { + isCarryingMeta = true; + } + if (tableName == null || region.getTable().getNameAsString().equals(tableName)) { + foundTableRegion = true; + } + if (foundTableRegion && (isCarryingMeta || !hasMetaRegion)) { + break; + } + } + if (isCarryingMeta && hasMetaRegion) { + // clients ask for a RS with META + if (!foundTableRegion) { + final HRegionServer destRS = hrs; + // the RS doesn't have regions of the specified table so we need move one to this RS + List tableRegions = + TEST_UTIL.getHBaseAdmin().getTableRegions(Bytes.toBytes(tableName)); + final HRegionInfo hri = tableRegions.get(0); + TEST_UTIL.getHBaseAdmin().move(hri.getEncodedNameAsBytes(), + Bytes.toBytes(destRS.getServerName().getServerName())); + // wait for region move completes + final RegionStates regionStates = + TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates(); + TEST_UTIL.waitFor(45000, 200, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + ServerName sn = regionStates.getRegionServerOfRegion(hri); + return (sn != null && sn.equals(destRS.getServerName())); + } + }); + } + return hrs; + } else if (hasMetaRegion || isCarryingMeta) { + continue; + } + if (foundTableRegion) break; + } + + return hrs; + } + +}