Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/IndexLoadBalancer.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/IndexLoadBalancer.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/IndexLoadBalancer.java (working copy) @@ -0,0 +1,661 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ClusterStatus; +import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.master.LoadBalancer; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.RegionPlan; +import org.apache.hadoop.hbase.master.RegionState; +import org.apache.hadoop.hbase.master.RegionStates; +import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.util.ReflectionUtils; + +/** + *

This class is an extension of the load balancer class. + * It allows to co-locate the regions of the user table and the regions of corresponding + * index table if any.

+ * + * roundRobinAssignment, retainAssignment -> index regions will follow the actual table regions. + * randomAssignment, balancerCluster -> either index table or actual table region(s) will follow + * each other based on which ever comes first.

+ * + *

In case of master failover there is a chance that the znodes of the index + * table and actual table are left behind. Then in that scenario we may get randomAssignment for + * either the actual table region first or the index table region first.

+ * + *

In case of balancing by table any table can balance first.

+ * + */ + +public class IndexLoadBalancer implements LoadBalancer { + + private static final Log LOG = LogFactory.getLog(IndexLoadBalancer.class); + + public static final byte[] PARENT_TABLE_KEY = Bytes.toBytes("PARENT_TABLE"); + + public static final String INDEX_BALANCER_DELEGATOR = + "hbase.index.balancer.delegator.class"; + + private LoadBalancer delegator; + + private MasterServices master; + + private Configuration conf; + + private ClusterStatus clusterStatus; + + private static final Random RANDOM = new Random(System.currentTimeMillis()); + + Map userTableVsIndexTable = new HashMap(); + + Map indexTableVsUserTable = new HashMap(); + + /** + * Maintains colocation information of user regions and corresponding index regions. + */ + private Map> colocationInfo = + new ConcurrentHashMap>(); + + private Set balancedTables = new HashSet(); + + private boolean stopped = false; + + @Override + public void initialize() throws HBaseIOException { + Class delegatorKlass = + conf.getClass(INDEX_BALANCER_DELEGATOR, StochasticLoadBalancer.class, + LoadBalancer.class); + this.delegator = ReflectionUtils.newInstance(delegatorKlass, conf); + this.delegator.setClusterStatus(clusterStatus); + this.delegator.setMasterServices(this.master); + this.delegator.initialize(); + try { + populateTablesToColocate(this.master.getTableDescriptors().getAll()); + } catch (IOException e) { + throw new HBaseIOException(e); + } + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(Configuration configuration) { + this.conf = configuration; + } + + @Override + public void setClusterStatus(ClusterStatus st) { + this.clusterStatus = st; + } + + public Map> getColocationInfo() { + return colocationInfo; + } + + @Override + public void setMasterServices(MasterServices masterServices) { + this.master = masterServices; + } + + @Override + public List balanceCluster(Map> clusterState) + throws HBaseIOException { + synchronized (this.colocationInfo) { + boolean balanceByTable = conf.getBoolean("hbase.master.loadbalance.bytable", false); + List regionPlans = null; + + TableName tableName = null; + if (balanceByTable) { + Map tableKeys = null; + for (Entry> serverVsRegionList : clusterState.entrySet()) { + ServerName sn = serverVsRegionList.getKey(); + List regionInfos = serverVsRegionList.getValue(); + if (regionInfos.isEmpty()) { + continue; + } + if (!isTableColocated(regionInfos.get(0).getTable())) { + return this.delegator.balanceCluster(clusterState); + } + // Just get the table name from any one of the values in the regioninfo list + if (tableName == null) { + tableName = regionInfos.get(0).getTable(); + tableKeys = this.colocationInfo.get(tableName); + } + // Check and modify the colocation info map based on values of cluster state because we + // will + // call balancer only when the cluster is in stable and reliable state. + if (tableKeys != null) { + for (HRegionInfo hri : regionInfos) { + updateServer(tableKeys, sn, hri); + } + } + } + // If user table is already balanced find the index table plans from the user table plans + // or vice verca. + TableName mappedTableName = getMappedTableToColocate(tableName); + if (balancedTables.contains(mappedTableName)) { + balancedTables.remove(mappedTableName); + regionPlans = new ArrayList(); + return prepareRegionPlansForClusterState(clusterState, regionPlans); + } else { + balancedTables.add(tableName); + regionPlans = this.delegator.balanceCluster(clusterState); + if (regionPlans == null) { + if (LOG.isDebugEnabled()) { + LOG.debug(tableName + " regions already balanced."); + } + return null; + } else { + updateRegionPlans(regionPlans); + return regionPlans; + } + } + + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Seperating user tables and index tables regions of " + + "each region server in the cluster."); + } + Map> userClusterState = + new HashMap>(); + Map> indexClusterState = + new HashMap>(); + for (Entry> serverVsRegionList : clusterState.entrySet()) { + ServerName sn = serverVsRegionList.getKey(); + List regionsInfos = serverVsRegionList.getValue(); + List idxRegionsToBeMoved = new ArrayList(); + List userRegionsToBeMoved = new ArrayList(); + for (HRegionInfo hri : regionsInfos) { + if (hri.isMetaRegion()) { + continue; + } + tableName = hri.getTable(); + // Check and modify the colocation info map based on values of cluster state because we + // will + // call balancer only when the cluster is in stable and reliable state. + if (isTableColocated(tableName)) { + // table name may change every time thats why always need to get table entries. + Map tableKeys = + this.colocationInfo.get(tableName); + if (tableKeys != null) { + updateServer(tableKeys, sn, hri); + } + } + if (indexTableVsUserTable.containsKey(tableName)) { + idxRegionsToBeMoved.add(hri); + continue; + } + userRegionsToBeMoved.add(hri); + } + // there may be dummy entries here if assignments by table is set + userClusterState.put(sn, userRegionsToBeMoved); + indexClusterState.put(sn, idxRegionsToBeMoved); + } + + regionPlans = this.delegator.balanceCluster(userClusterState); + if (regionPlans == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("User region plan is null."); + } + regionPlans = new ArrayList(); + } else { + updateRegionPlans(regionPlans); + } + return prepareRegionPlansForClusterState(indexClusterState, regionPlans); + } + } + } + + private void updateServer(Map tableKeys, ServerName sn, + HRegionInfo hri) { + ImmutableBytesWritable startKey = new ImmutableBytesWritable(hri.getStartKey()); + ServerName existingServer = tableKeys.get(startKey); + if (!sn.equals(existingServer)) { + if (LOG.isDebugEnabled()) { + LOG.debug("There is a mismatch in the existing server name for the region " + hri + + ". Replacing the server " + existingServer + " with " + sn + "."); + } + tableKeys.put(startKey, sn); + } + } + + /** + * Prepare region plans for cluster state + * @param clusterState if balancing is table wise then cluster state contains only indexed or + * index table regions, otherwise it contains all index tables regions. + * @param regionPlans + * @return + */ + private List prepareRegionPlansForClusterState( + Map> clusterState, List regionPlans) { + if (regionPlans == null) regionPlans = new ArrayList(); + ImmutableBytesWritable startKey = new ImmutableBytesWritable(); + for (Entry> serverVsRegionList : clusterState.entrySet()) { + List regionInfos = serverVsRegionList.getValue(); + ServerName server = serverVsRegionList.getKey(); + for (HRegionInfo regionInfo : regionInfos) { + if (!isTableColocated(regionInfo.getTable())) continue; + TableName mappedTableName = getMappedTableToColocate(regionInfo.getTable()); + startKey.set(regionInfo.getStartKey()); + ServerName sn = this.colocationInfo.get(mappedTableName).get(startKey); + if (sn.equals(server)) { + continue; + } else { + RegionPlan rp = new RegionPlan(regionInfo, server, sn); + if (LOG.isDebugEnabled()) { + LOG.debug("Selected server " + rp.getDestination() + " as destination for region " + + regionInfo.getRegionNameAsString() + " from colocation info."); + } + regionOnline(regionInfo, rp.getDestination()); + regionPlans.add(rp); + } + } + } + return regionPlans; + } + + private void updateRegionPlans(List regionPlans) { + for (RegionPlan regionPlan : regionPlans) { + HRegionInfo hri = regionPlan.getRegionInfo(); + if (!isTableColocated(hri.getTable())) continue; + if (LOG.isDebugEnabled()) { + LOG.debug("Saving region plan of region " + hri.getRegionNameAsString() + '.'); + } + regionOnline(hri, regionPlan.getDestination()); + } + } + + @Override + public Map> roundRobinAssignment(List regions, + List servers) throws HBaseIOException { + List userRegions = new ArrayList(); + List indexRegions = new ArrayList(); + for (HRegionInfo hri : regions) { + seperateUserAndIndexRegion(hri, userRegions, indexRegions); + } + Map> bulkPlan = null; + if (!userRegions.isEmpty()) { + bulkPlan = this.delegator.roundRobinAssignment(userRegions, servers); + // This should not happen. + if (null == bulkPlan) { + if (LOG.isDebugEnabled()) { + LOG.debug("No region plans selected for user regions in roundRobinAssignment."); + } + return null; + } + savePlan(bulkPlan); + } + bulkPlan = prepareIndexRegionsPlan(indexRegions, bulkPlan, servers); + return bulkPlan; + } + + private void seperateUserAndIndexRegion(HRegionInfo hri, List userRegions, + List indexRegions) { + if (indexTableVsUserTable.containsKey(hri.getTable())) { + indexRegions.add(hri); + return; + } + userRegions.add(hri); + } + + private Map> prepareIndexRegionsPlan( + List indexRegions, Map> bulkPlan, + List servers) throws HBaseIOException { + if (null != indexRegions && !indexRegions.isEmpty()) { + if (null == bulkPlan) { + bulkPlan = new ConcurrentHashMap>(); + } + for (HRegionInfo hri : indexRegions) { + if (LOG.isDebugEnabled()) { + LOG.debug("Preparing region plan for index region " + hri.getRegionNameAsString() + '.'); + } + ServerName destServer = getDestServerForIdxRegion(hri); + List destServerRegions = null; + if (destServer == null) destServer = this.randomAssignment(hri, servers); + if (destServer != null) { + destServerRegions = bulkPlan.get(destServer); + if (null == destServerRegions) { + destServerRegions = new ArrayList(); + bulkPlan.put(destServer, destServerRegions); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Server " + destServer + " selected for region " + + hri.getRegionNameAsString() + '.'); + } + destServerRegions.add(hri); + regionOnline(hri, destServer); + } + } + } + return bulkPlan; + } + + private ServerName getDestServerForIdxRegion(HRegionInfo hri) { + // Every time we calculate the table name because in case of master restart the index regions + // may be coming for different index tables. + TableName actualTable = getMappedTableToColocate(hri.getTable()); + synchronized (this.colocationInfo) { + Map regionMap = colocationInfo.get(actualTable); + if (null == regionMap) { + // Can this case come + return null; + } + if (regionMap.containsKey(hri.getStartKey())) { + // put index region location if corresponding user region found in regionLocation map. + ServerName sn = regionMap.get(hri.getStartKey()); + regionOnline(hri, sn); + return sn; + } + } + return null; + } + + private void savePlan(Map> bulkPlan) { + synchronized (this.colocationInfo) { + for (Entry> e : bulkPlan.entrySet()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Saving user regions' plans for server " + e.getKey() + '.'); + } + for (HRegionInfo hri : e.getValue()) { + if (!isTableColocated(hri.getTable())) continue; + regionOnline(hri, e.getKey()); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Saved user regions' plans for server " + e.getKey() + '.'); + } + } + } + } + + @Override + public Map> retainAssignment(Map regions, + List servers) throws HBaseIOException { + Map userRegionsMap = new ConcurrentHashMap(); + List indexRegions = new ArrayList(); + for (Entry e : regions.entrySet()) { + seperateUserAndIndexRegion(e, userRegionsMap, indexRegions, servers); + } + Map> bulkPlan = null; + if (!userRegionsMap.isEmpty()) { + bulkPlan = this.delegator.retainAssignment(userRegionsMap, servers); + if (bulkPlan == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Empty region plan for user regions."); + } + return null; + } + savePlan(bulkPlan); + } + bulkPlan = prepareIndexRegionsPlan(indexRegions, bulkPlan, servers); + return bulkPlan; + } + + private void seperateUserAndIndexRegion(Entry e, + Map userRegionsMap, List indexRegions, + List servers) { + HRegionInfo hri = e.getKey(); + if (indexTableVsUserTable.containsKey(hri.getTable())) { + indexRegions.add(hri); + return; + } + if (e.getValue() == null) { + userRegionsMap.put(hri, servers.get(RANDOM.nextInt(servers.size()))); + } else { + userRegionsMap.put(hri, e.getValue()); + } + } + + @Override + public Map immediateAssignment(List regions, + List servers) throws HBaseIOException { + return this.delegator.immediateAssignment(regions, servers); + } + + @Override + public ServerName randomAssignment(HRegionInfo regionInfo, List servers) + throws HBaseIOException { + if (!isTableColocated(regionInfo.getTable())) { + return this.delegator.randomAssignment(regionInfo, servers); + } + ServerName sn = getServerNameFromMap(regionInfo, servers); + if (sn == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("No server found for region " + regionInfo.getRegionNameAsString() + '.'); + } + sn = getRandomServer(regionInfo, servers); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Destination server for region " + regionInfo.getRegionNameAsString() + " is " + + ((sn == null) ? "null" : sn.toString()) + '.'); + } + return sn; + } + + private ServerName getRandomServer(HRegionInfo regionInfo, List servers) + throws HBaseIOException { + ServerName sn = null; + sn = this.delegator.randomAssignment(regionInfo, servers); + if (sn == null) return null; + regionOnline(regionInfo, sn); + return sn; + } + + private ServerName getServerNameFromMap(HRegionInfo regionInfo, List onlineServers) { + TableName tableName = regionInfo.getTable(); + TableName mappedTable = getMappedTableToColocate(regionInfo.getTable()); + ImmutableBytesWritable startKey = new ImmutableBytesWritable(regionInfo.getStartKey()); + synchronized (this.colocationInfo) { + Map correspondingTableKeys = + this.colocationInfo.get(mappedTable); + Map actualTableKeys = this.colocationInfo.get(tableName); + + if (null != correspondingTableKeys) { + if (correspondingTableKeys.containsKey(startKey)) { + ServerName previousServer = null; + if (null != actualTableKeys) { + previousServer = actualTableKeys.get(startKey); + } + ServerName sn = correspondingTableKeys.get(startKey); + if (null != previousServer) { + // if servername of index region and user region are same in colocationInfo clean + // previous plans and return null + if (previousServer.equals(sn)) { + correspondingTableKeys.remove(startKey); + actualTableKeys.remove(startKey); + if (LOG.isDebugEnabled()) { + LOG.debug("Both user region plan and corresponding index region plan " + + "in colocation info are same. Hence clearing the plans to select new plan" + + " for the region " + regionInfo.getRegionNameAsString() + "."); + } + return null; + } + } + if (sn != null && onlineServers.contains(sn)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Updating the region plan of the region " + + regionInfo.getRegionNameAsString() + " with server " + sn); + } + regionOnline(regionInfo, sn); + return sn; + } else if (sn != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("The location " + sn + " of region with start key" + + Bytes.toStringBinary(regionInfo.getStartKey()) + + " is not in online. Selecting other region server."); + } + return null; + } + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("No region plans in colocationInfo for table " + mappedTable); + } + } + return null; + } + } + + @Override + public void regionOnline(HRegionInfo regionInfo, ServerName sn) { + TableName tableName = regionInfo.getTable(); + synchronized (this.colocationInfo) { + Map tabkeKeys = this.colocationInfo.get(tableName); + if (tabkeKeys == null) { + tabkeKeys = new ConcurrentHashMap(); + this.colocationInfo.put(tableName, tabkeKeys); + } + tabkeKeys.put(new ImmutableBytesWritable(regionInfo.getStartKey()), sn); + } + } + + public void clearTableRegionPlans(TableName tableName) { + if (LOG.isDebugEnabled()) { + LOG.debug("Clearing regions plans from colocationInfo for table " + tableName); + } + synchronized (this.colocationInfo) { + this.colocationInfo.remove(tableName); + } + } + + @Override + public void regionOffline(HRegionInfo regionInfo) { + TableName tableName = regionInfo.getTable(); + synchronized (this.colocationInfo) { + Map tableKeys = this.colocationInfo.get(tableName); + if (null == tableKeys) { + if (LOG.isDebugEnabled()) { + LOG.debug("No regions of table " + tableName + " in the colocationInfo."); + } + } else { + tableKeys.remove(new ImmutableBytesWritable(regionInfo.getStartKey())); + if (LOG.isDebugEnabled()) { + LOG.debug("The regioninfo " + regionInfo + " removed from the colocationInfo"); + } + } + } + } + + @Override + public boolean isStopped() { + return stopped; + } + + @Override + public void stop(String why) { + LOG.info("Load Balancer stop requested: " + why); + stopped = true; + } + + public void populateTablesToColocate(Map tableDescriptors) { + HTableDescriptor desc = null; + for (Entry entry : tableDescriptors.entrySet()) { + desc = entry.getValue(); + if (desc.getValue(PARENT_TABLE_KEY) != null) { + addTablesToColocate(TableName.valueOf(desc.getValue(PARENT_TABLE_KEY)), + desc.getTableName()); + } + } + } + + /** + * Add tables whose regions to co-locate. + * @param userTable + * @param indexTable + */ + public void addTablesToColocate(TableName userTable, TableName indexTable) { + userTableVsIndexTable.put(userTable, indexTable); + indexTableVsUserTable.put(indexTable, userTable); + } + + /** + * Removes the specified table and corresponding table from co-location. + * @param table + */ + public void removeTablesFromColocation(TableName table) { + TableName other = userTableVsIndexTable.remove(table); + if (other != null) { + indexTableVsUserTable.remove(other); + } else { + other = indexTableVsUserTable.remove(table); + if (other != null) userTableVsIndexTable.remove(table); + } + } + + /** + * Return mapped table to co-locate. + * @param tableName + * @return + */ + public TableName getMappedTableToColocate(TableName tableName) { + TableName other = userTableVsIndexTable.get(tableName); + return other == null ? indexTableVsUserTable.get(tableName) : other; + } + + public boolean isTableColocated(TableName table) { + return userTableVsIndexTable.containsKey(table) || indexTableVsUserTable.containsKey(table); + } + + /** + * Populates table's region locations into co-location info from master. + * @param table + */ + public void populateRegionLocations(TableName table) { + synchronized (this.colocationInfo) { + if (!isTableColocated(table)) { + throw new IllegalArgumentException("Specified table " + table + + " should be in one of the tables to co-locate."); + } + RegionStates regionStates = this.master.getAssignmentManager().getRegionStates(); + List onlineRegions = regionStates.getRegionsOfTable(table); + for (HRegionInfo hri : onlineRegions) { + regionOnline(hri, regionStates.getRegionServerOfRegion(hri)); + } + Map regionsInTransition = regionStates.getRegionsInTransition(); + for (RegionState regionState : regionsInTransition.values()) { + if (table.equals(regionState.getRegion().getTable()) && regionState.getServerName() != null) { + regionOnline(regionState.getRegion(), regionState.getServerName()); + } + } + } + } +} Index: hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestIndexLoadBalancer.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestIndexLoadBalancer.java (revision 0) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestIndexLoadBalancer.java (working copy) @@ -0,0 +1,443 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.catalog.MetaReader; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.coprocessor.BaseMasterObserver; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.LoadBalancer; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.TestSplitTransactionOnCluster.MockedRegionObserver; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.zookeeper.ZKAssign; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(LargeTests.class) +public class TestIndexLoadBalancer { + + private static HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static HBaseAdmin admin = null; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + final int NUM_MASTERS = 1; + final int NUM_RS = 4; + Configuration conf = UTIL.getConfiguration(); + conf.setBoolean(HConstants.REGIONSERVER_INFO_PORT_AUTO, true); + conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, MockedMasterObserver.class.getName()); + conf.setClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, IndexLoadBalancer.class, + LoadBalancer.class); + UTIL.startMiniCluster(NUM_MASTERS, NUM_RS); + admin = UTIL.getHBaseAdmin(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + if (admin != null) { + admin.disableTables(".*"); + admin.deleteTables(".*"); + admin.close(); + } + UTIL.shutdownMiniCluster(); + } + + @Test(timeout = 180000) + public void testRoundRobinAssignmentDuringIndexTableCreation() throws Exception { + MiniHBaseCluster cluster = UTIL.getHBaseCluster(); + HMaster master = cluster.getMaster(); + TableName tableName = TableName.valueOf("testRoundRobinAssignmentDuringIndexTableCreation"); + TableName indexTableName = + TableName.valueOf("testRoundRobinAssignmentDuringIndexTableCreation_index"); + createUserAndIndexTable(tableName, indexTableName); + boolean isRegionColocated = + checkForColocation(master, tableName.getNameAsString(), indexTableName.getNameAsString()); + assertTrue("User regions and index regions should colocate.", isRegionColocated); + } + + @Test(timeout = 180000) + public void testColocationAfterSplit() throws Exception { + MiniHBaseCluster cluster = UTIL.getHBaseCluster(); + HMaster master = cluster.getMaster(); + // Table names to make use of the + TableName tableName = TableName.valueOf("testSplitHooksBeforeAndAfterPONR_1"); + TableName indexTableName = TableName.valueOf("testSplitHooksBeforeAndAfterPONR_2"); + HTableDescriptor htd = new HTableDescriptor(tableName); + htd.addCoprocessor(MockedRegionObserver.class.getName()); + htd.addFamily(new HColumnDescriptor("cf")); + char c = 'A'; + byte[][] split = new byte[20][]; + for (int i = 0; i < 20; i++) { + byte[] b = { (byte) c }; + split[i] = b; + c++; + } + admin.createTable(htd, split); + HTableDescriptor iHtd = new HTableDescriptor(indexTableName); + iHtd.addFamily(new HColumnDescriptor("cf")); + iHtd.setValue(IndexLoadBalancer.PARENT_TABLE_KEY, tableName.toBytes()); + admin.createTable(iHtd, split); + + // test put with the indexed column + + insertData(tableName); + insertData(indexTableName); + + admin.split(tableName.getNameAsString(), "c"); + List regionsOfUserTable = + master.getAssignmentManager().getRegionStates().getRegionsOfTable(tableName); + + while (regionsOfUserTable.size() != 22) { + Thread.sleep(100); + regionsOfUserTable = + master.getAssignmentManager().getRegionStates().getRegionsOfTable(tableName); + } + + List regionsOfIndexTable = + master.getAssignmentManager().getRegionStates().getRegionsOfTable(indexTableName); + + while (regionsOfIndexTable.size() != 22) { + Thread.sleep(100); + regionsOfIndexTable = + master.getAssignmentManager().getRegionStates().getRegionsOfTable(indexTableName); + } + boolean isRegionColocated = + checkForColocation(master, tableName.getNameAsString(), indexTableName.getNameAsString()); + assertTrue("User regions and index regions should colocate.", isRegionColocated); + } + + private void insertData(TableName tableName) throws IOException, InterruptedException { + HTable table = new HTable(admin.getConfiguration(), tableName); + Put p = new Put("a".getBytes()); + p.add(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("Val")); + p.add("cf".getBytes(), "q2".getBytes(), Bytes.toBytes("ValForCF2")); + table.put(p); + + Put p1 = new Put("b".getBytes()); + p1.add(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("Val")); + p1.add("cf".getBytes(), "q2".getBytes(), Bytes.toBytes("ValForCF2")); + table.put(p1); + + Put p2 = new Put("c".getBytes()); + p2.add(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("Val")); + p2.add("cf".getBytes(), "q2".getBytes(), Bytes.toBytes("ValForCF2")); + table.put(p2); + + Put p3 = new Put("c1".getBytes()); + p3.add(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("Val")); + p3.add("cf".getBytes(), "q2".getBytes(), Bytes.toBytes("ValForCF2")); + table.put(p3); + + Put p4 = new Put("d".getBytes()); + p4.add(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("Val")); + p4.add("cf".getBytes(), "q2".getBytes(), Bytes.toBytes("ValForCF2")); + table.put(p4); + admin.flush(tableName.getNameAsString()); + } + + @Test(timeout = 180000) + public void testRandomAssignmentDuringIndexTableEnable() throws Exception { + MiniHBaseCluster cluster = UTIL.getHBaseCluster(); + HMaster master = cluster.getMaster(); + master.getConfiguration().setBoolean("hbase.master.enabletable.roundrobin", false); + TableName tableName = TableName.valueOf("testRandomAssignmentDuringIndexTableEnable"); + TableName indexTableName = + TableName.valueOf("testRandomAssignmentDuringIndexTableEnable_index"); + createUserAndIndexTable(tableName, indexTableName); + admin.disableTable(tableName); + admin.disableTable(indexTableName); + admin.enableTable(tableName); + admin.enableTable(indexTableName); + boolean isRegionColocated = + checkForColocation(master, tableName.getNameAsString(), indexTableName.getNameAsString()); + assertTrue("User regions and index regions should colocate.", isRegionColocated); + + } + + @Test(timeout = 180000) + public void testBalanceCluster() throws Exception { + MiniHBaseCluster cluster = UTIL.getHBaseCluster(); + HMaster master = cluster.getMaster(); + master.getConfiguration().setBoolean("hbase.master.enabletable.roundrobin", false); + master.getConfiguration().setBoolean("hbase.master.startup.retainassign", false); + master.getConfiguration().setBoolean("hbase.master.loadbalance.bytable", false); + + TableName tableName = TableName.valueOf("testBalanceCluster"); + TableName indexTableName = TableName.valueOf("testBalanceCluster_index"); + createUserAndIndexTable(tableName, indexTableName); + HTableDescriptor htd1 = new HTableDescriptor(TableName.valueOf("testBalanceCluster1")); + htd1.addFamily(new HColumnDescriptor("fam1")); + char c = 'A'; + byte[][] split1 = new byte[12][]; + for (int i = 0; i < 12; i++) { + byte[] b = { (byte) c }; + split1[i] = b; + c++; + } + admin.setBalancerRunning(false, false); + admin.createTable(htd1, split1); + admin.disableTable(tableName); + admin.enableTable(tableName); + admin.setBalancerRunning(true, false); + admin.balancer(); + boolean isRegionsColocated = + checkForColocation(master, tableName.getNameAsString(), indexTableName.getNameAsString()); + assertTrue("User regions and index regions should colocate.", isRegionsColocated); + } + + @Test(timeout = 180000) + public void testBalanceByTable() throws Exception { + ZooKeeperWatcher zkw = UTIL.getZooKeeperWatcher(UTIL); + MiniHBaseCluster cluster = UTIL.getHBaseCluster(); + HMaster master = cluster.getMaster(); + master.getConfiguration().setBoolean("hbase.master.loadbalance.bytable", true); + TableName tableName = TableName.valueOf("testBalanceByTable"); + TableName indexTableName = TableName.valueOf("testBalanceByTable_index"); + createUserAndIndexTable(tableName, indexTableName); + HTableDescriptor htd1 = new HTableDescriptor(TableName.valueOf("testBalanceByTable1")); + htd1.addFamily(new HColumnDescriptor("fam1")); + char c = 'A'; + byte[][] split1 = new byte[12][]; + for (int i = 0; i < 12; i++) { + byte[] b = { (byte) c }; + split1[i] = b; + c++; + } + admin.disableTable(tableName); + admin.enableTable(tableName); + admin.setBalancerRunning(true, false); + boolean isRegionColocated = + checkForColocation(master, tableName.getNameAsString(), indexTableName.getNameAsString()); + assertTrue("User regions and index regions should colocate.", isRegionColocated); + admin.balancer(); + Thread.sleep(10000); + ZKAssign.blockUntilNoRIT(zkw); + while (master.getAssignmentManager().getRegionStates().isRegionsInTransition()) { + Threads.sleep(1000); + } + isRegionColocated = + checkForColocation(master, tableName.getNameAsString(), indexTableName.getNameAsString()); + assertTrue("User regions and index regions should colocate.", isRegionColocated); + } + + @Test(timeout = 180000) + public void testRoundRobinAssignmentAfterRegionServerDown() throws Exception { + ZooKeeperWatcher zkw = UTIL.getZooKeeperWatcher(UTIL); + MiniHBaseCluster cluster = UTIL.getHBaseCluster(); + HMaster master = cluster.getMaster(); + TableName tableName = TableName.valueOf("testRoundRobinAssignmentAfterRegionServerDown"); + TableName indexTableName = + TableName.valueOf("testRoundRobinAssignmentAfterRegionServerDown_index"); + createUserAndIndexTable(tableName, indexTableName); + HRegionServer regionServer = cluster.getRegionServer(1); + regionServer.abort("Aborting to test random assignment after region server down"); + while (master.getServerManager().areDeadServersInProgress()) { + Thread.sleep(1000); + } + ZKAssign.blockUntilNoRIT(zkw); + while (master.getAssignmentManager().getRegionStates().isRegionsInTransition()) { + Threads.sleep(1000); + } + boolean isRegionColocated = + checkForColocation(master, tableName.getNameAsString(), indexTableName.getNameAsString()); + assertTrue("User regions and index regions should colocate.", isRegionColocated); + + } + + @Test(timeout = 180000) + public void testRetainAssignmentDuringMasterStartUp() throws Exception { + ZooKeeperWatcher zkw = UTIL.getZooKeeperWatcher(UTIL); + MiniHBaseCluster cluster = UTIL.getHBaseCluster(); + HMaster master = cluster.getMaster(); + master.getConfiguration().setBoolean("hbase.master.startup.retainassign", true); + TableName tableName = TableName.valueOf("testRetainAssignmentDuringMasterStartUp"); + TableName indexTableName = TableName.valueOf("testRetainAssignmentDuringMasterStartUp_index"); + createUserAndIndexTable(tableName, indexTableName); + UTIL.shutdownMiniHBaseCluster(); + UTIL.startMiniHBaseCluster(1, 4); + cluster = UTIL.getHBaseCluster(); + master = cluster.getMaster(); + if (admin != null) { + admin.close(); + admin = new HBaseAdmin(master.getConfiguration()); + } + ZKAssign.blockUntilNoRIT(zkw); + while (master.getAssignmentManager().getRegionStates().isRegionsInTransition()) { + Threads.sleep(1000); + } + boolean isRegionColocated = + checkForColocation(master, tableName.getNameAsString(), indexTableName.getNameAsString()); + assertTrue("User regions and index regions should colocate.", isRegionColocated); + + } + + @Test(timeout = 300000) + public void testRoundRobinAssignmentDuringMasterStartUp() throws Exception { + MiniHBaseCluster cluster = UTIL.getHBaseCluster(); + HMaster master = cluster.getMaster(); + UTIL.getConfiguration().setBoolean("hbase.master.startup.retainassign", false); + + TableName tableName = TableName.valueOf("testRoundRobinAssignmentDuringMasterStartUp"); + TableName indexTableName = + TableName.valueOf("testRoundRobinAssignmentDuringMasterStartUp_index"); + createUserAndIndexTable(tableName, indexTableName); + UTIL.shutdownMiniHBaseCluster(); + cluster.waitUntilShutDown(); + UTIL.startMiniHBaseCluster(1, 4); + cluster = UTIL.getHBaseCluster(); + if (admin != null) { + admin.close(); + admin = new HBaseAdmin(cluster.getMaster().getConfiguration()); + } + master = cluster.getMaster(); + while (master.getAssignmentManager().getRegionStates().isRegionsInTransition()) { + Threads.sleep(1000); + } + boolean isRegionColocated = + checkForColocation(master, tableName.getNameAsString(), indexTableName.getNameAsString()); + assertTrue("User regions and index regions should colocate.", isRegionColocated); + } + + private void createUserAndIndexTable(TableName tableName, TableName indexTableName) + throws IOException { + HTableDescriptor htd = new HTableDescriptor(tableName); + htd.addFamily(new HColumnDescriptor("cf")); + char c = 'A'; + byte[][] split = new byte[20][]; + for (int i = 0; i < 20; i++) { + byte[] b = { (byte) c }; + split[i] = b; + c++; + } + admin.createTable(htd, split); + HTableDescriptor iHtd = new HTableDescriptor(indexTableName); + iHtd.addFamily(new HColumnDescriptor("cf")); + iHtd.setValue(IndexLoadBalancer.PARENT_TABLE_KEY, tableName.toBytes()); + admin.createTable(iHtd, split); + } + + private List> getStartKeysAndLocations(HMaster master, String tableName) + throws IOException, InterruptedException { + + List> tableRegionsAndLocations = + MetaReader.getTableRegionsAndLocations(master.getCatalogTracker(), + TableName.valueOf(tableName)); + List> startKeyAndLocationPairs = + new ArrayList>(tableRegionsAndLocations.size()); + Pair startKeyAndLocation = null; + for (Pair regionAndLocation : tableRegionsAndLocations) { + startKeyAndLocation = + new Pair(regionAndLocation.getFirst().getStartKey(), + regionAndLocation.getSecond()); + startKeyAndLocationPairs.add(startKeyAndLocation); + } + return startKeyAndLocationPairs; + + } + + public boolean checkForColocation(HMaster master, String tableName, String indexTableName) + throws IOException, InterruptedException { + List> uTableStartKeysAndLocations = + getStartKeysAndLocations(master, tableName); + List> iTableStartKeysAndLocations = + getStartKeysAndLocations(master, indexTableName); + + boolean regionsColocated = true; + if (uTableStartKeysAndLocations.size() != iTableStartKeysAndLocations.size()) { + regionsColocated = false; + } else { + for (int i = 0; i < uTableStartKeysAndLocations.size(); i++) { + Pair uStartKeyAndLocation = uTableStartKeysAndLocations.get(i); + Pair iStartKeyAndLocation = iTableStartKeysAndLocations.get(i); + + if (Bytes.compareTo(uStartKeyAndLocation.getFirst(), iStartKeyAndLocation.getFirst()) == 0) { + if (uStartKeyAndLocation.getSecond().equals(iStartKeyAndLocation.getSecond())) { + continue; + } + } + regionsColocated = false; + } + } + return regionsColocated; + } + + public static class MockedMasterObserver extends BaseMasterObserver { + IndexLoadBalancer balancer = null; + + @Override + public void preMasterInitialization(ObserverContext ctx) + throws IOException { + LoadBalancer loadBalancer = + ctx.getEnvironment().getMasterServices().getAssignmentManager().getBalancer(); + if (loadBalancer instanceof IndexLoadBalancer) { + balancer = (IndexLoadBalancer) loadBalancer; + } + super.preMasterInitialization(ctx); + } + + @Override + public void preCreateTableHandler(ObserverContext ctx, + HTableDescriptor desc, HRegionInfo[] regions) throws IOException { + TableName userTableName = null; + if (balancer != null && desc.getValue(IndexLoadBalancer.PARENT_TABLE_KEY) != null) { + userTableName = + TableName.valueOf(desc.getValue(IndexLoadBalancer.PARENT_TABLE_KEY)); + balancer.addTablesToColocate(userTableName, desc.getTableName()); + } + if (userTableName != null) balancer.populateRegionLocations(userTableName); + super.preCreateTableHandler(ctx, desc, regions); + } + + @Override + public void postDeleteTableHandler(ObserverContext ctx, + TableName tableName) throws IOException { + if (balancer.isTableColocated(tableName)) { + balancer.removeTablesFromColocation(tableName); + } + } + } + +}