Index: src/test/java/org/apache/hadoop/hbase/master/group/TestGroup.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/master/group/TestGroup.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/master/group/TestGroup.java (revision 0) @@ -0,0 +1,464 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.group; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.master.GroupInfo; +import org.apache.hadoop.hbase.master.GroupInfoManager; +import org.apache.hadoop.hbase.master.GroupInfoManager.ServerPlan; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.RSGroupAssignmentManager; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Class to test region server groups. Spins up the minicluster once and test + * all function of group. + */ +@Category(MediumTests.class) +public class TestGroup { + static HBaseAdmin admin = null; + static final int regionNumber = 9; + static final int serverNumber = 10; + private static String table1 = "table1"; + private static String table2 = "talbe2"; + private static String table3 = "talbe3"; + private static String table4 = "talbe4"; + private static String table5 = "talbe5"; + private static String group1 = "g1"; + private static String group2 = "g2"; + private static String group3 = "g3"; + private static String group4 = "g4"; + private static byte[][] startKeys; + static { + startKeys = new byte[regionNumber][]; + for (int i = 0; i < regionNumber; i++) { + startKeys[i] = Bytes.toBytes(String.format( + "%0" + ((regionNumber + "").length()) + "d", i)); + } + } + final static Log LOG = LogFactory.getLog(TestGroup.class); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + static GroupInfoManager manager; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().set("hbase.master.info.port", "-1"); + TEST_UTIL.getConfiguration().set("hbase.regionserver.info.port", "-1"); + TEST_UTIL.getConfiguration().setBoolean(GroupInfoManager.GROUP_ENABLE_KEY, + true); + TEST_UTIL.startMiniCluster(serverNumber); + manager = ((RSGroupAssignmentManager) (TEST_UTIL.getMiniHBaseCluster() + .getMaster().getAssignmentManager())).getInfoManager(); + try { + admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); + } catch (Exception e) { + LOG.info(e); + } + HTableDescriptor des; + + try { + if (admin.tableExists(table1)) { + admin.disableTable(table1); + admin.deleteTable(table1); + } + des = new HTableDescriptor(table1); + des.addFamily(new HColumnDescriptor("ff")); + admin.createTable(des, startKeys); + } catch (Exception e) { + LOG.info(e); + } + try { + if (admin.tableExists(table2)) { + admin.disableTable(table2); + admin.deleteTable(table2); + } + des = new HTableDescriptor(table2); + des.addFamily(new HColumnDescriptor("ff")); + admin.createTable(des, startKeys); + } catch (Exception e) { + LOG.info(e); + } + try { + if (admin.tableExists(table3)) { + admin.disableTable(table3); + admin.deleteTable(table3); + } + des = new HTableDescriptor(table3); + des.addFamily(new HColumnDescriptor("ff")); + admin.createTable(des, startKeys); + } catch (Exception e) { + LOG.info(e); + } + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Before + public void setUp() throws Exception { + + } + + @After + public void tearDown() throws Exception { + + } + + @Test + public void testAddAndRemoveGroup() throws IOException { + LOG.debug("groups:" + manager.getGroups()); + manager.addGroup(group1, true); + manager.addGroup(group2, true); + manager.addGroup(group3, true); + manager.addGroup(group4, true); + + LOG.debug("groups:" + manager.getGroups()); + assertTrue(manager.removeGroup(group4)); + assertFalse(manager.removeGroup(GroupInfo.DEFAULT_GROUP)); + + } + + private List moveServer(int N, String group) + throws InterruptedException, IOException { + Set onlineServers = TEST_UTIL.getHBaseCluster().getMaster() + .getServerManager().getOnlineServers().keySet(); + // move three servers to group1; + List plans = new ArrayList(); + Set metaAndRoot = new HashSet(); + metaAndRoot.add(TEST_UTIL.getHBaseCluster().getMaster().getCatalogTracker() + .getMetaLocation()); + metaAndRoot.add(TEST_UTIL.getHBaseCluster().getMaster().getCatalogTracker() + .getRootLocation()); + int num = 0; + for (ServerName server : onlineServers) { + if (metaAndRoot.contains(server) + || !manager.getDefaultGroup().contains(server)) + continue; + plans.add(new ServerPlan(server, GroupInfo.DEFAULT_GROUP, group)); + num++; + if (num == N) + break; + } + LOG.debug(manager); + assertTrue(manager.moveServer(plans).size() == 3); + return plans; + } + + @Test + public void testAddAndRemoveServer() throws IOException, InterruptedException { + List plans = moveServer(3, group1); + Set metaAndRoot = new HashSet(); + metaAndRoot.add(TEST_UTIL.getHBaseCluster().getMaster().getCatalogTracker() + .getMetaLocation()); + metaAndRoot.add(TEST_UTIL.getHBaseCluster().getMaster().getCatalogTracker() + .getRootLocation()); + GroupInfo group = manager.getGroup(group1); + assertTrue(group.getServers().size() == 3); + // plans to move meta or root,should be abort. + for (ServerName server : metaAndRoot) { + plans.add(new ServerPlan(server, GroupInfo.DEFAULT_GROUP, group1)); + } + assertTrue(manager.moveServer(plans).size() == 0); + List hris = TEST_UTIL.getHBaseCluster().getMaster() + .getAssignmentManager().getRegionsOfTable(Bytes.toBytes(table1)); + hris.addAll(TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager() + .getRegionsOfTable(Bytes.toBytes(table2))); + for (HRegionInfo hri : hris) { + assertFalse(group.getServers().contains( + TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager() + .getRegionServerOfRegion(hri))); + } + } + + @Test + public void testChangeTableGroupToEmptyGroup() throws IOException { + GroupInfoManager.changeTableGroup(TEST_UTIL.getConfiguration(), group2, + table1); + GroupInfo group = manager.getDefaultGroup(); + List hris = TEST_UTIL.getHBaseCluster().getMaster() + .getAssignmentManager().getRegionsOfTable(Bytes.toBytes(table1)); + for (HRegionInfo hri : hris) { + assertTrue(group.getServers().contains( + TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager() + .getRegionServerOfRegion(hri))); + } + } + + @Test + public void testAddServerToEmptyGroup() throws IOException, + InterruptedException { + moveServer(3, group2); + GroupInfo group = manager.getGroup(group2); + List hris = TEST_UTIL.getHBaseCluster().getMaster() + .getAssignmentManager().getRegionsOfTable(Bytes.toBytes(table1)); + for (HRegionInfo hri : hris) { + assertTrue(group.getServers().contains( + TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager() + .getRegionServerOfRegion(hri))); + } + } + + @Test + public void testChangeTableGroup() throws IOException { + GroupInfoManager.changeTableGroup(TEST_UTIL.getConfiguration(), group1, + table1); + GroupInfo group = manager.getGroup(group1); + assertTrue(group.getServers().size() == 3); + List hris = TEST_UTIL.getHBaseCluster().getMaster() + .getAssignmentManager().getRegionsOfTable(Bytes.toBytes(table1)); + for (HRegionInfo hri : hris) { + assertTrue(group.getServers().contains( + TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager() + .getRegionServerOfRegion(hri))); + } + } + + @Test + public void testAddNewTables() throws IOException { + HTableDescriptor des; + //Add a table without group attribute. + try { + if (admin.tableExists(table4)) { + admin.disableTable(table4); + admin.deleteTable(table4); + } + des = new HTableDescriptor(table4); + des.addFamily(new HColumnDescriptor("ff")); + admin.createTable(des, startKeys); + } catch (Exception e) { + LOG.info(e); + } + GroupInfo group = manager.getDefaultGroup(); + List hris = TEST_UTIL.getHBaseCluster().getMaster() + .getAssignmentManager().getRegionsOfTable(Bytes.toBytes(table4)); + for (HRegionInfo hri : hris) { + assertTrue(group.getServers().contains( + TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager() + .getRegionServerOfRegion(hri))); + } + + //Add a table with group attribute. + try { + if (admin.tableExists(table5)) { + admin.disableTable(table5); + admin.deleteTable(table5); + } + des = new HTableDescriptor(table5); + des.setValue(GroupInfo.GROUP_KEY, Bytes.toBytes(group1)); + des.addFamily(new HColumnDescriptor("ff")); + admin.createTable(des, startKeys); + } catch (Exception e) { + LOG.info(e); + } + group = manager.getGroup(group1); + hris = TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager() + .getRegionsOfTable(Bytes.toBytes(table5)); + for (HRegionInfo hri : hris) { + assertTrue(group.getServers().contains( + TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager() + .getRegionServerOfRegion(hri))); + } + } + + @Test + public void testRestart() throws IOException, InterruptedException { + int regionNum = manager.getRegionOfGroup(group1).size(); + TEST_UTIL.shutdownMiniHBaseCluster(); + TEST_UTIL.restartHBaseCluster(serverNumber); + manager = ((RSGroupAssignmentManager) (TEST_UTIL.getMiniHBaseCluster() + .getMaster().getAssignmentManager())).getInfoManager(); + // Because after restarted the region servers' port number are not the same + // with the previous cluster. + // we need to move server again. + moveServer(3, group1); + admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); + HTableDescriptor afterdes = admin.getTableDescriptor(Bytes.toBytes(table1)); + String groups = Bytes.toString(afterdes.getValue(GroupInfo.GROUP_KEY)); + assertTrue(group1.equals(groups)); + + int remainRegionNum = manager.getRegionOfGroup(group1).size(); + LOG.info("Formal regionnum " + regionNum + ";After restart find " + + remainRegionNum); + assertTrue(regionNum == remainRegionNum); + + GroupInfo group = manager.getGroup(group1); + List hris = TEST_UTIL.getHBaseCluster().getMaster() + .getAssignmentManager().getRegionsOfTable(Bytes.toBytes(table1)); + for (HRegionInfo hri : hris) { + assertTrue(group.getServers().contains( + TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager() + .getRegionServerOfRegion(hri))); + } + + } + + @Test + public void testBalanceCluster() throws IOException, InterruptedException { + GroupInfoManager.changeTableGroup(TEST_UTIL.getConfiguration(), group4, + table1); + Set onlineServers = TEST_UTIL.getHBaseCluster().getMaster() + .getServerManager().getOnlineServers().keySet(); + // move four servers to group1; + List plans = new ArrayList(); + Set metaAndRoot = new HashSet(); + metaAndRoot.add(TEST_UTIL.getHBaseCluster().getMaster().getCatalogTracker() + .getMetaLocation()); + metaAndRoot.add(TEST_UTIL.getHBaseCluster().getMaster().getCatalogTracker() + .getRootLocation()); + int num = 0; + for (ServerName server : onlineServers) { + if (metaAndRoot.contains(server)) + continue; + plans.add(new ServerPlan(server, GroupInfo.DEFAULT_GROUP, group4)); + num++; + if (num == 4) + break; + } + manager.moveServer(plans); + TEST_UTIL.getMiniHBaseCluster().getMaster().balance(); + } + + @Test + public void testBalanceTableAndGroup() throws TableNotFoundException, + IOException, InterruptedException { + HTableDescriptor des = admin.getTableDescriptor(Bytes.toBytes(table1)); + // add 4 server to group4 + assertTrue(manager.addGroup(group4, true)); + List plans = new ArrayList(); + Set metaAndRoot = new HashSet(); + metaAndRoot.add(TEST_UTIL.getHBaseCluster().getMaster().getCatalogTracker() + .getMetaLocation()); + metaAndRoot.add(TEST_UTIL.getHBaseCluster().getMaster().getCatalogTracker() + .getRootLocation()); + int num = 0; + ServerName lastserver = null; + Set onlineServers = TEST_UTIL.getHBaseCluster().getMaster() + .getServerManager().getOnlineServers().keySet(); + for (ServerName server : onlineServers) { + if (metaAndRoot.contains(server)) + continue; + plans.add(new ServerPlan(server, GroupInfo.DEFAULT_GROUP, group4)); + num++; + lastserver = server; + if (num == 4) + break; + } + assertTrue(manager.moveServer(plans).size() == 4); + LOG.info("group4 server size is :" + + manager.getGroup(group4).getServers().size()); + // update table1 + try { + if (admin.tableExists(table1)) { + admin.disableTable(table1); + admin.deleteTable(table1); + } + des = new HTableDescriptor(table1); + des.addFamily(new HColumnDescriptor("ff")); + admin.createTable(des, startKeys); + } catch (Exception e) { + LOG.info(e); + } + // change table1 group to group4 + assertTrue(GroupInfoManager.changeTableGroup(TEST_UTIL.getConfiguration(), + group4, "table1")); + // remove one server from group4 + List newplans = new ArrayList(); + ServerPlan pl = new ServerPlan(lastserver, group4, GroupInfo.DEFAULT_GROUP); + newplans.add(pl); + manager.moveServer(newplans); + // balance table1 + assertTrue(manager.balanceTable("table1")); + List regions = TEST_UTIL.getMiniHBaseCluster().getMaster() + .getAssignmentManager().getRegionsOfTable(Bytes.toBytes("table1")); + LOG.info("All regions of table1 is :" + regions); + HashSet servers = new HashSet( + manager.getAvailableServer("table1")); + assertTrue(servers.size() == 3); + LOG.info("available table1 size " + servers.size()); + Map regionMap = new HashMap(); + HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster(); + ServerName server = null; + for (HRegionInfo info : regions) { + server = master.getAssignmentManager().getRegionServerOfRegion(info); + if (!regionMap.containsKey(server)) { + regionMap.put(server, 1); + } else { + Integer val = regionMap.get(server); + regionMap.remove(server); + regionMap.put(server, val + 1); + } + } + int avg = regions.size() / servers.size(); + LOG.info("Avg regionnum on server is " + avg + "; and map size is " + + regionMap.size()); + + for (Map.Entry entry : regionMap.entrySet()) { + LOG.info("Server " + entry.getKey() + "===>" + entry.getValue()); + } + // assertTrue(balance); + // add a new server to group4 + // remove one server from group4 + newplans = new ArrayList(); + pl = new ServerPlan(lastserver, GroupInfo.DEFAULT_GROUP, group4); + newplans.add(pl); + manager.moveServer(newplans); + servers = manager.getGroup(group4).getServers(); + assertTrue(servers.size() == 4); + // balance group4 + manager.balanceGroup(group4); + LOG.info("All regions of group4 is :" + manager.getRegionOfGroup(group4)); + LOG.info("group4 size is " + servers.size()); + regionMap = new HashMap(); + for (ServerName sn : servers) { + regionMap.put(sn, master.getAssignmentManager().getAssignments().get(sn) + .size()); + } + avg = regions.size() / servers.size(); + LOG.info("Avg regionnum on group4 is " + avg + "; and map size is " + + regionMap.size()); + + for (Map.Entry entry : regionMap.entrySet()) { + LOG.info("Server " + entry.getKey() + "===>" + entry.getValue()); + } + // assertTrue(balance); + } + +} Index: src/main/java/org/apache/hadoop/hbase/master/GroupInfo.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/GroupInfo.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/master/GroupInfo.java (revision 0) @@ -0,0 +1,301 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; + +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Writables; + +/** + * Stores the group information of region server groups. + * Contains the group name,special configuration tag and servers of this group. + * The comparable implementation of this class compares only the group name + */ +public class GroupInfo extends Writables { + + //Set to store the region server configuration of this group. + private HashSet hostAndPorts = new HashSet(); + + //Set to store region servers which are actually assigned to this group. + private HashSet serverNames = new HashSet(); + + //Default group name. + public static final String DEFAULT_GROUP = "0"; + + //The splitter used by table descriptor + public static final String GROUP_SPLITER = ","; + public static final String MOVEPLAN_SPLITER = "#"; + private boolean specialConf = false; + + public static final byte[] GROUP_KEY = Bytes.toBytes("_group"); + private String name = ""; + public GroupInfo() + { + + } + public GroupInfo(boolean defaultGroup) + { + if(defaultGroup) + { + this.name=DEFAULT_GROUP ; + } + } + + /** + * Get group name. + * @return + */ + public String getName() { + return name; + } + + /** + * Set the name of the group. + * @param name + */ + public void setName(String name) { + this.name = name; + } + + /** + * If the group use a special configuration. + * @return + */ + public boolean isSpecialConf() { + return specialConf; + } + + /** + * Tag whether the configuration of the group is special. + * @param specialConf + */ + public void setSpecialConf(boolean specialConf) { + this.specialConf = specialConf; + } + + /** + * If this group is the default group. + * @return + */ + public boolean isDefault() { + return this.name.equals(DEFAULT_GROUP); + } + + @Override + public boolean equals(Object info) + { + + if(info==null) + { + return false; + } + else + { + if(!(info instanceof GroupInfo)) + { + return false; + } + return ((GroupInfo)info).name.equals(this.name); + } + } + + @Override + public int hashCode() + { + return this.name.hashCode(); + } + + /** + * Add a server to this group. + * @param server server + * @param updateConf should update configuration + * @return + */ + public boolean add(ServerName server,boolean updateConf) { + if(updateConf) + { + this.hostAndPorts.add(server.getHostAndPort()); + } + int size=this.serverNames.size(); + this.serverNames.add(server); + return size==0&&this.serverNames.size()==1; + } + + + /** + * Return if the server is in this group. + * @param server + * @return true if the server is in this group + */ + public boolean contains(ServerName server) { + return this.serverNames.contains(server); + } + + /** + * Whether this hostAndPort is in group's configuration. + * @param hostAndPort + * @return + */ + public boolean contains(String hostAndPort) { + return this.hostAndPorts.contains(hostAndPort); + } + + /** + * Get a copy of servers. + * @return + */ + public HashSet getServers() { + return new HashSet(this.serverNames); + } + + + /** + * Write the group out. + * + * @param out + * @throws IOException + */ + private void write(DataOutput out) throws IOException { + Bytes.writeByteArray(out, Bytes.toBytes(name)); + out.writeBoolean(this.specialConf); + out.writeInt(this.serverNames.size()); + for (ServerName server : serverNames) { + Bytes.writeByteArray(out, Bytes.toBytes(server.getHostAndPort())); + } + } + + private void readFields(final DataInput in) throws IOException { + name = Bytes.toString(Bytes.readByteArray(in)); + this.specialConf = in.readBoolean(); + int size = in.readInt(); + for (int i = 0; i < size; i++) { + String hostAndPort = Bytes.toString(Bytes.readByteArray(in)); + this.hostAndPorts.add(hostAndPort); + } + } + /** + * Read a list of GroupInfo. + * @param in DataInput + * @return + * @throws IOException + */ + public static List readGroups(final DataInput in) throws IOException + { + List groupList =new ArrayList(); + GroupInfo defaultGroup = null; + if (in != null) { + int size = in.readInt(); + for (int i = 0; i < size; i++) { + GroupInfo group = new GroupInfo(); + group.readFields(in); + groupList.add(group); + if (group.isDefault()) { + defaultGroup = group; + } + } + } + if (defaultGroup == null) { + defaultGroup = new GroupInfo(true); + groupList.add(defaultGroup); + } + return groupList; + } + + /** + * Write a list of group information out. + * + * @param groups + * @param out + * @throws IOException + */ + public static void writeGroups(List groups, DataOutput out) + throws IOException { + out.writeInt(groups.size()); + for (GroupInfo group : groups) { + group.write(out); + } + } + + /** + * Remove a server from this group. + * @param server + * @param updateConf update the group configuration. + */ + public void remove(ServerName server,boolean updateConf) { + if(updateConf) + { + this.hostAndPorts.remove(server.getHostAndPort()); + } + this.serverNames.remove(server); + } + + /** + * Get group attribute from a table descriptor. + * @param des + * @return + */ + public static List getGroupList(HTableDescriptor des){ + String [] grouparray = getGroupString(des).split(GROUP_SPLITER); + List grouplist = new ArrayList(); + for(String str : grouparray){ + grouplist.add(str); + } + return grouplist; + } + + /** + * Get group attribute from a table descriptor. + * @param des + * @return + */ + public static String getGroupString(HTableDescriptor des) { + byte[] gbyte = des.getValue(GROUP_KEY); + if (gbyte != null) { + return Bytes.toString(des.getValue(GROUP_KEY)); + } else { + return DEFAULT_GROUP; + } + } + @Override + public String toString() + { + StringBuffer sb=new StringBuffer(); + sb.append("{name:"); + sb.append(this.name); + sb.append(" special conf"); + sb.append(this.specialConf); + sb.append(" hostAndPort:"); + sb.append(this.hostAndPorts); + sb.append(" Severs:"); + sb.append(this.serverNames +"}"); + return sb.toString(); + + } + +} Index: src/main/java/org/apache/hadoop/hbase/master/RSGroupAssignmentManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/RSGroupAssignmentManager.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/master/RSGroupAssignmentManager.java (revision 0) @@ -0,0 +1,158 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.catalog.CatalogTracker; +import org.apache.hadoop.hbase.executor.ExecutorService; +import org.apache.hadoop.hbase.master.RegionPlan; +import org.apache.zookeeper.KeeperException; + +/** + * Manager assignment of regions, according to region server group information. + * + */ +public class RSGroupAssignmentManager extends AssignmentManager { + + MasterServices master; + private static final Log LOG = LogFactory + .getLog(RSGroupAssignmentManager.class); + private GroupInfoManager groupManager = null; + + public RSGroupAssignmentManager(Server master, ServerManager serverManager, + CatalogTracker catalogTracker, ExecutorService service) + throws KeeperException, IOException { + super(master, serverManager, catalogTracker, service); + this.master = (HMaster) master; + groupManager = new GroupInfoManager((HMaster) master); + ((GroupLoadBalancer)this.balancer).setManager(groupManager); + this.balancer.setMasterServices(this.master); + } + + public GroupInfoManager getInfoManager() { + return groupManager; + } + + @Override + RegionPlan getRegionPlan(final RegionState state, + final ServerName serverToExclude, final boolean forceNewPlan) { + List servers; + boolean rootOrMeta = false; + if (state.getRegion().isMetaRegion() || state.getRegion().isRootRegion()) { + servers = new ArrayList(this.groupManager + .getDefaultGroup().getServers()); + rootOrMeta = true; + } else { + //Get available servers according to region's group attribute. + servers = this.groupManager.getAvailableServer( + state.getRegion().getTableNameAsString()); + } + final List drainingServers = master.getServerManager() + .getDrainingServersList(); + // Loop through the draining server list and remove them from the server + // list. + if (!drainingServers.isEmpty()) { + for (final ServerName server : drainingServers) { + LOG.debug("Removing draining server: " + server + + " from eligible server pool."); + servers.remove(server); + } + } + String encodedName = state.getRegion().getEncodedName(); + // The remove below hinges on the fact that the call to + // serverManager.getOnlineServersList() returns a copy + if (serverToExclude != null) + servers.remove(serverToExclude); + if (servers.isEmpty()) { + LOG.error("Threre is no server to assign region:" + state.getRegion()); + return null; + } + + RegionPlan randomPlan = new RegionPlan(state.getRegion(), null, + balancer.randomAssignment(state.getRegion(),servers)); + boolean newPlan = false; + RegionPlan existingPlan = null; + + synchronized (this.regionPlans) { + existingPlan = this.regionPlans.get(encodedName); + if (existingPlan != null && existingPlan.getDestination() != null) { + LOG.debug("Found an existing plan for " + + state.getRegion().getRegionNameAsString() + + " destination server is + " + + existingPlan.getDestination().toString()); + } + + if (forceNewPlan || existingPlan == null + || existingPlan.getDestination() == null + || drainingServers.contains(existingPlan.getDestination()) + //Check if the existing plan conforms to the group assignment. + || (!servers.contains(existingPlan.getDestination()))) { + newPlan = true; + this.regionPlans.put(encodedName, randomPlan); + } + } + if (newPlan) { + if (rootOrMeta) { + LOG.error("No previous transition plan was found (or we are ignoring " + + "an existing plan) for " + + state.getRegion().getRegionNameAsString() + + " so generated a random one; " + randomPlan + "; " + + serverManager.countOfRegionServers() + " (online=" + + serverManager.getOnlineServers().size() + ", exclude=" + + drainingServers.size() + ") available servers"); + } + return randomPlan; + } + if (rootOrMeta) { + LOG.error("Using pre-existing plan for region " + + state.getRegion().getRegionNameAsString() + "; plan=" + + existingPlan); + } + return existingPlan; + } + + /** + * Assign regions,this method will lock the server movement. + * No server will be add to {@link GroupInfoManager#serversInTransition}. + * @param region + * @param setOfflineInZK + * @param forceNewPlan + * @param hijack + * - true new assignment is needed, false otherwise + */ + @Override + public void assign(HRegionInfo region, boolean setOfflineInZK, + boolean forceNewPlan, boolean hijack) { + this.groupManager.getMovementLock().lock(); + try { + super.assign(region, setOfflineInZK, forceNewPlan, hijack); + } finally { + this.groupManager.getMovementLock().unlock(); + } + } + +} Index: src/main/java/org/apache/hadoop/hbase/master/LoadBalancerFactory.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/LoadBalancerFactory.java (revision 1302790) +++ src/main/java/org/apache/hadoop/hbase/master/LoadBalancerFactory.java (working copy) @@ -38,11 +38,15 @@ */ public static LoadBalancer getLoadBalancer(Configuration conf) { + Class balancerKlass; // Create the balancer - Class balancerKlass = conf.getClass( - HConstants.HBASE_MASTER_LOADBALANCER_CLASS, - DefaultLoadBalancer.class, LoadBalancer.class); + if (conf.getBoolean(GroupInfoManager.GROUP_ENABLE_KEY, false)) { + balancerKlass = conf.getClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, + GroupLoadBalancer.class, LoadBalancer.class); + } else { + balancerKlass = conf.getClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, + DefaultLoadBalancer.class, LoadBalancer.class); + } return ReflectionUtils.newInstance(balancerKlass, conf); - } } Index: src/main/java/org/apache/hadoop/hbase/master/GroupInfoManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/GroupInfoManager.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/master/GroupInfoManager.java (revision 0) @@ -0,0 +1,1083 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import java.io.DataInput; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableExistsException; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.MetaScanner; +import org.apache.hadoop.hbase.master.AssignmentManager.RegionState; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.Writables; + +/** + * + */ +public class GroupInfoManager extends Writables { + private static final Log LOG = LogFactory.getLog(GroupInfoManager.class); + /** The file name used to store group information in HDFS */ + public static final String GROUPINFO_NAME = ".groupinfo"; + public final static String GROUP_ENABLE_KEY="hbase.rsgroup.enable"; + private final FileSystem fs; + private HMaster master; + private Path path; + + //If true,regions could be assigned to the meta server. + private static boolean assignToMetaServer; + + //Store the region server group information, + //key is group name and value is a GroupInfo instance + private ConcurrentHashMap groupMap; + + //Store the servers which are about to be moved from one group to another, + //regions will not be assign to these servers. + private ConcurrentHashMap serversInTransition; + + //Used to lock the movements map. + private final ReentrantLock movementTransitionLock = new ReentrantLock(true); + + private final ReentrantLock deleteGroupLock = new ReentrantLock(true); + + //Cache servers which assigned already. + private ConcurrentSkipListSet assignedServer ; + + public GroupInfoManager(HMaster master) throws IOException { + this.master = master; + groupMap= new ConcurrentHashMap(); + serversInTransition=new ConcurrentHashMap(); + assignedServer=new ConcurrentSkipListSet(); + Configuration conf = master.getConfiguration(); + this.path = new Path(FSUtils.getRootDir(conf), GROUPINFO_NAME); + this.fs = FSUtils.getRootDir(conf).getFileSystem(conf); + assignToMetaServer = master.getConfiguration().getBoolean( + "hbase.group.assigntometaserver", true); + this.readConfig(); + } + + /** + * Remove a region server from the group which contain this server. + * @param server the server which will be removed. + * @param writeConf should group configuration be flush to HDFS + */ + public void removeServer(ServerName server, boolean writeConf) { + movementTransitionLock.lock(); + try { + if (!assignedServer.contains(server)) + return; + GroupInfo group = contains(server, + this.groupMap.values()); + if (group == null) { + LOG.error("Server:" + server + " not in group"); + } else { + group.remove(server, writeConf); + } + if (writeConf) { + this.writeConfig(); + } + this.assignedServer.remove(server); + } catch (IOException e) { + LOG.error("Write group configuration error !"); + } finally { + movementTransitionLock.unlock(); + } + } + + /** + * Register a new server to its group. + * @param server + * @return + */ + public boolean addServer(ServerName server) { + return addServer(server, null); + } + + /** + * Register a new server and check the default group. + * If there is a group has its first server, the default group should + * be checked and if there regions belong to this group,these regions + * should be unassigned. + * @param server + * @return + */ + public void addServerAndCheckDefaultGroup(ServerName server) { + if (addServer(server)) { + try { + checkDefaultGroup(); + } catch (IOException e) { + LOG.info("Check default group error ", e); + } + } + } + + /** + * Register a new server and add it to its group. + * + * @param server + * @param targetGroup + * which group this server is added to,if null the server will be + * added according to the group configuration and the group + * configuration will not be updated or flushed to HDFS,because if + * the targetGroup is null, this method is invoked by ServerManger + * and not by user actions.If user wants to move servers from one + * group to another, we update the group configuration. + * @return If there is a group has its first server and the default group + * should be checked. + */ + public boolean addServer(ServerName server, String targetGroup) { + boolean fistServer = false; + GroupInfo group = null; + + //Lock the group to make sure the target group not be deleted. + deleteGroupLock.lock(); + try { + //Add to assigned server set to prevent duplicate assignment. + if (assignedServer.contains(server)) + return false; + // If targetGroup is not null, should not change the group + // configuration. + + if (targetGroup != null) { + group = this.getGroup(targetGroup); + } + if (group == null) { + group = contains(server.getHostAndPort(), this.groupMap.values()); + if (group == null) { + this.groupMap.get(GroupInfo.DEFAULT_GROUP).add(server, false); + } else { + fistServer = group.add(server, false); + } + } else { + fistServer = group.add(server, true); + writeConfig(); + } + this.assignedServer.add(server); + } catch (IOException e) { + LOG.error("Write group configuration error !",e); + } + finally + { + deleteGroupLock.unlock(); + } + return fistServer; + } + + /** + * When there are groups have no server, we will assign their regions to + * default group. After these groups are not empty, check the default group + * and move their regions back. + * + * @throws IOException + */ + private void checkDefaultGroup() throws IOException { + if (!master.isInitialized()) { + return; + } + Set noEmptyGroup = new HashSet(); + + //Get groups which are not empty. + for (GroupInfo g : groupMap.values()) { + if ((!g.isDefault()) && g.getServers().size() != 0) { + noEmptyGroup.add(g); + } + } + List regionsToUnassign = new ArrayList(); + for (ServerName sn : getDefaultGroup().getServers()) { + // Check every server in the default group. + List regions = this.master.getAssignmentManager() + .getAssignments().get(sn); + if (regions == null) + continue; + for (HRegionInfo region : regions) { + Set groupSet = getGroupInfo(region.getTableNameAsString()); + // If there is a group not empty and this table shouldn't be assign to + // default group. + for (GroupInfo group : noEmptyGroup) + if ((!groupSet.contains(getDefaultGroup())) + && groupSet.contains(group)) { + regionsToUnassign.add(region); + } + } + } + + //Unassign regions. + if (regionsToUnassign.size() != 0) { + LOG.info("Moving " + regionsToUnassign.size() + " regions."); + BulkUnassigner bd = new BulkUnassigner(this.master, regionsToUnassign); + try { + bd.bulkAssign(); + } catch (InterruptedException e) { + LOG.warn("Moving was interrupted"); + Thread.currentThread().interrupt(); + } + } + } + + /** + * Write the configuration to HDFS. + * @throws IOException + */ + private void writeConfig() throws IOException { + FSDataOutputStream output = null; + try { + output = this.fs.create(this.path); + GroupInfo.writeGroups(new ArrayList(this.groupMap.values()), + output); + } finally { + output.close(); + } + } + + /** + * Read group configuration from HDFS, only used when system + * starts up. + * + * @throws IOException + */ + private void readConfig() throws IOException { + + List groupList; + DataInput in = null; + if (fs.exists(path)) { + in = fs.open(path); + } + groupList = GroupInfo.readGroups(in); + for (GroupInfo group : groupList) { + groupMap.put(group.getName(), group); + } + this.assignedServer.clear(); + List servers = master.getServerManager() + .getOnlineServersList(); + for (ServerName server : servers) { + addServer(server); + } + } + + ReentrantLock getMovementLock() { + return movementTransitionLock; + } + + /** + * Get regions of a server + * @param server the name of the region server + * @return List of HRegionInfo the region server contains + */ + public List getRegionOfServer(ServerName server) { + List infos = master.getAssignmentManager().getAssignments() + .get(server); + if (infos == null) { + return new ArrayList(); + } else { + return new ArrayList(infos); + } + } + + /** + * Get regions of a group. + * + * @param groupName the name of the group + * @return list of regions contained by this group + */ + public List getRegionOfGroup(String groupName) { + List regions = new ArrayList(); + if (!groupExist(groupName)) + { + return regions; + } + HashSet servers = groupMap.get(groupName).getServers(); + for (ServerName server : servers) { + regions.addAll(getRegionOfServer(server)); + } + return regions; + } + + /** + * Get tables of a region server + * + * @param server the name of the region server + * @return List of HTableDescriptor the region server contains + */ + public List getTablesOfServer(ServerName server) { + Set set = new HashSet(); + List regions = getRegionOfServer(server); + for (HRegionInfo region : regions) { + try { + set.add(master.getTableDescriptors() + .get(region.getTableName())); + } catch (Exception e) { + LOG.error(e); + } + } + return new ArrayList(set); + } + + /** + * Get regions of a group + * + * @param groupName the name of the group + * @return list of regions this group contains + */ + public List getRegionsOfGroup(String groupName) { + List regions = new ArrayList(); + if (groupName == null || !groupExist(groupName)) { + return regions; + } + HashSet servers = groupMap.get(groupName).getServers(); + for (ServerName server : servers) { + regions.addAll(getRegionOfServer(server)); + } + return regions; + } + + /** + * Get tables of a group. + * @param groupName the name of the group + * @return List of HTableDescriptor + + */ + public List getTablesOfGroup(String groupName){ + Set set = new HashSet(); + if (groupName == null || !groupExist(groupName)) { + return new ArrayList(); + } + List regions = getRegionsOfGroup(groupName); + for (HRegionInfo region : regions) { + set.add(region.getTableNameAsString()); + } + return new ArrayList(set); + } + + /** + * Return the default group. + * @return + */ + public GroupInfo getDefaultGroup() { + return this.groupMap.get(GroupInfo.DEFAULT_GROUP); + } + + /** + * @param groupName + * @return + */ + public GroupInfo getGroup(String groupName) { + return this.groupMap.get(groupName); + } + + /** + * Find the group which contains the server defined + * by the hostAndPort string from the collection. + * @param hostAndPort + * @param collection + * @return + */ + public static GroupInfo contains(String hostAndPort, + Collection collection) { + for (GroupInfo group : collection) { + if (group.contains(hostAndPort)) + { + return group; + } + } + return null; + } + + /** + * Find the group which contains the server. + * @param server + * @param collection + * @return the group + */ + public static GroupInfo contains(ServerName server, + Collection collection) { + for (GroupInfo group : collection) { + if (group.contains(server)) + return group; + } + return null; + } + + + /** + * If the server can find its group in group list, + * the server will be returned. + * @param servers + * @param groupList + * @return + */ + private List filter(List servers, + Set groupList) { + List ret = new ArrayList(); + for (ServerName server : servers) + for (GroupInfo group : groupList) { + if (group.contains(server)) + ret.add(server); + } + return ret; + } + + private Set getGroupInfo(String tableName) { + HTableDescriptor des = null; + Set ret = new HashSet(); + try { + des = master.getTableDescriptors().get(tableName); + } catch (TableExistsException e) { + LOG.error(e); + return new HashSet(); + } catch (FileNotFoundException e) { + LOG.error(e); + } catch (IOException e) { + LOG.error(e); + } + List groupS = GroupInfo.getGroupList(des); + for (String s : groupS) { + if (groupMap.get(s) != null) { + ret.add(groupMap.get(s)); + } + } + return ret; + } + + /** + * Get available servers from the list. + * @param servers the candidates + * @param table + * @return + */ + public List getAvailableServer(List servers, + String table) { + Set groupList = getGroupInfo(table); + return filterServers(filter(servers, groupList)); + } + + /** + * Get servers which this table can use according to its group information + * @param tableName + * @return available server list + */ + public List getAvailableServer(String tableName) { + Set groupList = getGroupInfo(tableName); + List ret = new ArrayList(); + for (GroupInfo group : groupList) { + ret.addAll(group.getServers()); + } + if (ret.size() == 0) { + ret.addAll(this.getDefaultGroup().getServers()); + } + return filterServers(ret); + } + + /** + * Check if the region and server are in the some group. + * @param regionInfo + * @param serverInfo + * @return + */ + boolean inSameGroup(HRegionInfo regionInfo, ServerName serverInfo) { + Set groups = getGroupInfo(regionInfo.getTableNameAsString()); + for (GroupInfo group : groups) { + if (group.contains(serverInfo)) + return true; + } + return false; + } + + /** + * Filter servers which are being moved from the list, if assignToMetaServer + * is not true,filter the meta server too. + * + * @param servers + * @return + */ + private List filterServers( + List servers) { + List ret = new ArrayList(servers); + ServerName metaAddress = master.getCatalogTracker().getMetaLocation(); + if (!assignToMetaServer&&metaAddress != null) { + for (ServerName server : ret) { + if (server.equals(metaAddress)) { + ret.remove(server); + } + } + } + for (ServerName server : ret) { + if (this.serversInTransition.contains(server)) { + ret.remove(server); + } + } + return ret; + } + + /** + * Get group information + * @return a copy of groups + */ + public Map getGroups() { + return new HashMap(groupMap); + } + + /** + * Check the groups,if there is one group of the string exists, + * return true. + * @param names + * @return + */ + boolean groupExist(String names) { + if (names == null) { + return false; + } + String[] groupArray = names.split(GroupInfo.GROUP_SPLITER); + if (groupArray.length != 0) { + for (String s : groupArray) { + if (groupMap.get(s) != null) { + return true; + } + } + } + return false; + } + + /** + * Add a server plan to transition map + * @param sp plan name to be add to serverInMoving + */ + private boolean checkAndAddToMovingServers(ServerPlan sp) { + movementTransitionLock.lock(); + try { + if (this.serversInTransition.contains(sp.getServername()) + || (!this.groupExist(sp.getTargetGroup())) + || getGroup(sp.getTargetGroup()).contains(sp.getServername()) + || sp.getServername().equals( + master.getCatalogTracker().getRootLocation()) + || sp.getServername().equals( + master.getCatalogTracker().getMetaLocation()) + || (sp.sourceGroup.equals(getDefaultGroup()) && getDefaultGroup() + .getServers().size() <= 1)) { + return false; + } + this.serversInTransition.put(sp.getServername(), sp); + } catch (InterruptedException e) { + LOG.error("Get root location error:", e); + return false; + } finally { + movementTransitionLock.unlock(); + } + return true; + } + + /** + * Removing sn form movement transition. + * @param Server + * ServerName to be remove from {@link #serversInTransition} + */ + private void removeFromTransition(ServerName server) { + movementTransitionLock.lock(); + try { + this.serversInTransition.remove(server); + } finally { + movementTransitionLock.unlock(); + } + } + + /** + * Delete a group,the group mustn't be default group or contain any servers + * @param groupName + * The name of group you want to delete + * @return true if delete successfully + */ + public boolean removeGroup(String groupName) { + + GroupInfo group = groupMap.get(groupName); + if (group == null) + return true; + if (group.isDefault() || group.getServers().size() != 0) + return false; + // Check if the group is the target group of some moving servers. + deleteGroupLock.lock(); + try { + for (ServerPlan plan : serversInTransition.values()) { + if (groupName.equals(plan.getTargetGroup())) { + return false; + } + } + try { + groupMap.remove(groupName); + writeConfig(); + return true; + } catch (IOException e) { + LOG.error("Write group configuration error", e); + groupMap.put(group.getName(), group); + return false; + } + } finally { + deleteGroupLock.unlock(); + } + } + + /** + * Add a new group. + * @param groupName + * The name of group you want to add. + * @param specialConf + * If configuration of this group is special + * TODO:reconstruct the code of group configuration management. + * @return true if add successfully + */ + public boolean addGroup(String groupName, boolean specialConf) { + try { + if (groupMap.get(groupName) == null) { + GroupInfo g = new GroupInfo(); + g.setName(groupName); + g.setSpecialConf(specialConf); + groupMap.put(g.getName(), g); + writeConfig(); + return true; + } + } catch (IOException e) { + LOG.error("Write group configuration error !",e); + removeGroup(groupName); + return false; + } + return true; + } + + private synchronized void balanceGroup(HashSet availableServers) { + if (availableServers == null || availableServers.size() < 2) + return; + Map> assignment = master + .getAssignmentManager().getAssignments(); + + Map> map = new HashMap>(); + + //Find out the assignments of available servers. + for (ServerName server : assignment.keySet()) { + if (availableServers.contains(server)) { + map.put(server, new ArrayList(assignment.get(server))); + } + } + //If there available servers have no regions, add them to the map. + for (ServerName server : availableServers) { + if (map.get(server) == null) { + map.put(server, new ArrayList()); + } + } + List plans = master.getAssignmentManager().balancer + .balanceCluster(map); + for (RegionPlan plan : plans) { + if (plans != null && !plans.isEmpty()) { + LOG.info("balance " + plans); + master.assignmentManager.balance(plan); + } + } + } + + private synchronized void balanceTable(String table, List servers) + throws IOException { + Map regions = MetaScanner.allTableRegions( + master.getConfiguration(), Bytes.toBytes(table), false); + Map> assignment = null; + assignment = new HashMap>(); + Map wrongAssignment = null; + wrongAssignment = new HashMap(); + + //Collect the assignments. + for (Entry e : regions.entrySet()) { + if (servers.contains(e.getValue())) { + if (assignment.get(e.getValue()) == null) { + assignment.put(e.getValue(), new ArrayList()); + } + assignment.get(e.getValue()).add(e.getKey()); + } else { + //If this regions are assign to wrong servers (maybe default group check didn't find + //this regions), reassign them. + wrongAssignment.put(e.getKey(), e.getValue()); + } + } + List plans = master.getAssignmentManager().balancer + .balanceCluster(assignment); + for (Entry e : wrongAssignment.entrySet()) { + plans.add(new RegionPlan(e.getKey(), e.getValue(), master + .getAssignmentManager().balancer.randomAssignment(e.getKey(),servers))); + } + for (RegionPlan plan : plans) { + if (plans != null && !plans.isEmpty()) { + LOG.info("balance " + plans); + master.assignmentManager.balance(plan); + } + } + } + + /** + * Balance the region load of a group. + * @param groupName + * The name of group you want to balance + * @return true if balance successfully + * @throws IOException + * if a remote or network exception occurs + */ + public boolean balanceGroup(String groupName) throws IOException { + if (groupName == null || getGroups().get(groupName) == null) + return false; + synchronized (master.getBalancer()) { + if (shouldRunBalance() == false) { + LOG.info("System is been balance state, please try later..."); + return false; + } + this.master.setLoadBalancerRunning(true); + try { + balanceGroup(getGroup(groupName).getServers()); + } finally { + master.setLoadBalancerRunning(false); + } + return true; + } + } + + /** + * Carry out a table level region balance, find all regions and available + * servers of the table, then use GroupLoadBalance.balanceCluster() + * to balance load. + * + * @param tableName the name of table you want to balance + * @return true if balance successfully + * @throws IOException + * if a remote or network exception occurs + */ + public boolean balanceTable(String tableName) throws IOException { + if (tableName == null) { + return false; + } + List servers = getAvailableServer(tableName); + if (servers == null || servers.size() <= 1) + return false; + synchronized (master.getBalancer()) { + if (shouldRunBalance() == false) { + LOG.info("System is been balance state, please try later..."); + return false; + } + master.setLoadBalancerRunning(true); + try { + balanceTable(tableName, servers); + } finally { + master.setLoadBalancerRunning(false); + } + return true; + } + } + + /** + * Change table's group.attribute. + * + * @param conf + * @param groups + * table group property ,more than one group,use ',' as splitter. + * @param tableName + * The name of table you want to change + * @return true if change successfully + * @throws IOException + * if a remote or network exception occurs + */ + public static boolean changeTableGroup(Configuration conf, String groups, + String tableName) throws IOException { + if (groups == null || tableName == null) + return false; + HBaseAdmin admin=new HBaseAdmin(conf); + HTableDescriptor des = admin.getTableDescriptor(Bytes.toBytes(tableName)); + if (des == null) + return false; + byte[] gbyte = Bytes.toBytes(groups); + admin.disableTable(tableName); + des.setValue(GroupInfo.GROUP_KEY, gbyte); + admin.modifyTable(des.getName(), des); + admin.enableTable(tableName); + return true; + } + + /** + * Carry out the server movement. + * + * @param plans + * list of {@link GroupOperations#ServerMovingPlan} + * @return successfully moved plans. + * @throws IOException + */ + public synchronized List moveServer(List plans) + throws IOException { + if (plans == null) { + return null; + } + List pls = new ArrayList(); + for (ServerPlan pl : plans) { + if (checkAndAddToMovingServers(pl)) { + pls.add(pl); + } + } + LOG.debug("Move Server plans:" + pls); + boolean checkDefault = false; + try { + for (ServerPlan pl : pls) { + LOG.debug("Plan move " + pl.getServername().getHostAndPort() + + " from group " + pl.getSourceGroup() + " to group " + + pl.getTargetGroup()); + removeServer(pl.getServername(), true); + handleDrainServer(master, pl.getServername()); + if (addServer(pl.getServername(), pl.getTargetGroup())) { + checkDefault = true; + } + LOG.debug("Successfully move " + pl.getServername().getHostAndPort() + + " from group " + pl.getSourceGroup() + " to group " + + pl.getTargetGroup()); + + } + } finally { + //Remove servers from transition,if there are plans which are not + //successful, abort these movements. + for (ServerPlan pl : pls) { + removeFromTransition(pl.getServername()); + } + } + if (checkDefault) { + checkDefaultGroup(); + } + return pls; + } + + /** + * Move out regions of this server. + * @param server master service + * @param sn the server to be drained. + * @throws IOException + */ + private void handleDrainServer(final Server server, ServerName sn) + throws IOException { + while (true) { + List regions = null; + regions = getRegionOfServer(sn); + if (regions.isEmpty()) { + break; + } + LOG.info("Moving " + regions.size() + " regions."); + BulkUnassigner bd = new BulkUnassigner(server, regions); + try { + if (bd.bulkAssign()) { + break; + } + } catch (InterruptedException e) { + LOG.warn("Moving was interrupted"); + Thread.currentThread().interrupt(); + break; + } + } + } + + /** + * Stores the plan for the move of an individual server. + * + * Contains {@link ServerName} for the server being moved, {@link GroupInfo} + * for the group the server should be moved from, the server should be moved + * to. + * + * The comparable implementation of this class compares only the ServerName + * information and not the source/dest group info. + */ + public static class ServerPlan implements Comparable { + private ServerName servername; + private String sourceGroup; + private String targetGroup; + + public ServerPlan(ServerName serverName, String sourceGroup, + String targetGroup) { + this.servername = serverName; + this.sourceGroup = sourceGroup; + this.targetGroup = targetGroup; + } + /** + * Get the ServerName + * @return + */ + public ServerName getServername() { + return servername; + } + + /** + * Set the serverName + * @param servername + */ + public void setServerName(ServerName servername) { + this.servername = servername; + } + + /** + * Get the source group. + * @return + */ + public String getSourceGroup() { + return sourceGroup; + } + + /** + * Set the source group. + * @param sourceGroup + */ + public void setSourceGroup(String sourceGroup) { + this.sourceGroup = sourceGroup; + } + + /** + * Get the target group. + * @return + */ + public String getTargetGroup() { + return targetGroup; + } + + /** + * Set the target group. + * @param targetGroup + */ + public void setTargetGroup(String targetGroup) { + this.targetGroup = targetGroup; + } + + @Override + public int compareTo(ServerPlan other) { + return getServername().compareTo(other.getServername()); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(getServername() + ":" + getSourceGroup() + ":" + + getTargetGroup()); + return sb.toString(); + } + + @Override + public int hashCode() { + return servername.hashCode(); + } + } + + /** + * Perform bulk unassignment of regions in the server which will be drained. + */ + class BulkUnassigner extends BulkAssigner { + private final List regions; + + BulkUnassigner(final Server server, final List regions) { + super(server); + this.regions = regions; + } + + @Override + protected void populatePool(ExecutorService pool) { + for (HRegionInfo region : regions) { + if (master.getAssignmentManager().isRegionInTransition(region) != null) + continue; + final HRegionInfo hri = region; + pool.execute(new Runnable() { + public void run() { + master.getAssignmentManager().unassign(hri); + } + }); + } + } + + @Override + protected boolean waitUntilDone(long timeout) throws InterruptedException { + long startTime = System.currentTimeMillis(); + long remaining = timeout; + Map regionInTransition = null; + int waitingTimeForEvents = server.getConfiguration().getInt( + "hbase.master.event.waiting.time", 1000); + boolean finished = true; + while (!server.isStopped() && remaining > 0) { + Thread.sleep(waitingTimeForEvents); + regionInTransition = master.getAssignmentManager() + .getRegionsInTransition(); + finished = true; + for (HRegionInfo info : this.regions) { + LOG.debug("HRegion info:" + info.getEncodedName()); + if (regionInTransition.get(info.getEncodedName()) != null) { + finished = false; + } + } + LOG.debug("Transaction Regions " + regionInTransition); + if (finished) + break; + remaining = timeout - (System.currentTimeMillis() - startTime); + } + return finished; + } + } + + private boolean shouldRunBalance() { + if (master.isLoadBalancerRunning()) { + LOG.debug("Load balancer is currently running. " + + "Skipping the current execution."); + return false; + } + + // Only allow one balance run at at time. + if (master.getAssignmentManager().isRegionsInTransition()) { + LOG.debug("Not running balancer because " + + master.assignmentManager.getRegionsInTransition().size() + + " region(s) in transition: " + + org.apache.commons.lang.StringUtils + .abbreviate(master.assignmentManager.getRegionsInTransition() + .toString(), 256)); + return false; + } + if (master.getServerManager().areDeadServersInProgress()) { + LOG.debug("Not running balancer because processing dead " + + "regionserver(s): " + master.getServerManager().getDeadServers()); + return false; + } + if (master.getSchemaChangeTracker().isSchemaChangeInProgress()) { + LOG.debug("Schema change operation is in progress. Waiting for " + + "it to complete before running the load balancer."); + return false; + } + return true; + } +} Index: src/main/java/org/apache/hadoop/hbase/master/HMaster.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/HMaster.java (revision 1302790) +++ src/main/java/org/apache/hadoop/hbase/master/HMaster.java (working copy) @@ -422,9 +422,16 @@ this, conf.getInt("hbase.master.catalog.timeout", Integer.MAX_VALUE)); this.catalogTracker.start(); - this.assignmentManager = new AssignmentManager(this, serverManager, - this.catalogTracker, this.executorService); - this.balancer = LoadBalancerFactory.getLoadBalancer(conf); + if (conf.getBoolean(GroupInfoManager.GROUP_ENABLE_KEY, false)) { + this.assignmentManager = new RSGroupAssignmentManager(this, + serverManager, this.catalogTracker, this.executorService); + this.balancer = this.assignmentManager.balancer; + } else { + this.assignmentManager = new AssignmentManager(this, serverManager, + this.catalogTracker, this.executorService); + this.balancer = LoadBalancerFactory.getLoadBalancer(conf); + } + zooKeeper.registerListenerFirst(assignmentManager); this.regionServerTracker = new RegionServerTracker(zooKeeper, this, @@ -505,7 +512,11 @@ this.executorService = new ExecutorService(getServerName().toString()); - this.serverManager = new ServerManager(this, this); + if (conf.getBoolean(GroupInfoManager.GROUP_ENABLE_KEY, false)) { + this.serverManager = new GroupServerManager(this, this); + } else { + this.serverManager = new ServerManager(this, this); + } status.setStatus("Initializing ZK system trackers"); initializeZKBasedSystemTrackers(); @@ -1066,6 +1077,14 @@ return balancerRan; } + public void setLoadBalancerRunning(boolean loadBalancerRunning) { + this.loadBalancerRunning = loadBalancerRunning; + } + + public LoadBalancer getBalancer() { + return balancer; + } + enum BalanceSwitchMode { SYNC, ASYNC Index: src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (revision 1302790) +++ src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (working copy) @@ -105,13 +105,13 @@ protected Server master; - private ServerManager serverManager; + protected ServerManager serverManager; private CatalogTracker catalogTracker; private TimeoutMonitor timeoutMonitor; - private LoadBalancer balancer; + protected LoadBalancer balancer; /** * Map of regions to reopen after the schema of a table is changed. Key - @@ -3107,7 +3107,7 @@ * If a new server has come in and it has no regions, it will not be included * in the returned Map. */ - Map> getAssignments() { + public Map> getAssignments() { // This is an EXPENSIVE clone. Cloning though is the safest thing to do. // Can't let out original since it can change and at least the loadbalancer // wants to iterate this exported list. We need to synchronize on regions Index: src/main/java/org/apache/hadoop/hbase/master/GroupServerManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/GroupServerManager.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/master/GroupServerManager.java (revision 0) @@ -0,0 +1,84 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import org.apache.hadoop.hbase.HServerLoad; +import org.apache.hadoop.hbase.PleaseHoldException; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.YouAreDeadException; +import org.apache.hadoop.hbase.ZooKeeperConnectionException; + +/** + * The GroupServerManager class manages information about region servers. + *

+ * It will handle new servers and expired servers to maintain + * group information and place region servers to suitable group. + */ +public class GroupServerManager extends ServerManager { + MasterServices services; + + public GroupServerManager(Server master, MasterServices services) + throws ZooKeeperConnectionException { + super(master, services); + this.services = services; + } + + /** + * If a server is expired , it must be removed from its group + */ + @Override + public synchronized void expireServer(final ServerName serverName) { + super.expireServer(serverName); + //remove expired server from GroupInfoManager + ((RSGroupAssignmentManager) (services.getAssignmentManager())) + .getInfoManager().removeServer(serverName, false); + } + + /** + * If new server is added , it must be add to appropriate group + */ + @Override + void recordNewServer(final ServerName serverName, final HServerLoad hsl) { + super.recordNewServer(serverName, hsl); + //add new server to group manager + ((RSGroupAssignmentManager) (services.getAssignmentManager())) + .getInfoManager().addServerAndCheckDefaultGroup(serverName); + } + + /** + * Update server to make sure region server be placed in suitable group + */ + @Override + void regionServerReport(ServerName sn, HServerLoad hsl) + throws YouAreDeadException, PleaseHoldException { + boolean newServer = false; + if (this.getOnlineServers().get(sn) == null) { + newServer = true; + } + super.regionServerReport(sn, hsl); + // If a group changes from empty to not,must check the default group + // to see if there some regions should be move back. + if (newServer) { + ((RSGroupAssignmentManager) (services.getAssignmentManager())) + .getInfoManager().addServerAndCheckDefaultGroup(sn); + } + } +} Index: src/main/java/org/apache/hadoop/hbase/master/GroupLoadBalancer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/GroupLoadBalancer.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/master/GroupLoadBalancer.java (revision 0) @@ -0,0 +1,173 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.TreeMap; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.ServerName; + +/** + * Provide region move plans according to group information. + */ +public class GroupLoadBalancer extends DefaultLoadBalancer { + private static final Log LOG = LogFactory.getLog(GroupLoadBalancer.class); + static Random RANDOM = new Random(); + GroupInfoManager manager; + public void setManager(GroupInfoManager manager) { + this.manager = manager; + } + + /** + * Generates a round robin assignment plan, used on cluster startup using a + * simple round-robin assignment. + * server group information. + * @param regions all regions + * @param servers all servers + * @return map of server to the regions it should take, or null if no + * assignment is possible (ie. no regions or no servers) + */ + @Override + public Map> roundRobinAssignment( + List regions, List servers) { + if (regions.isEmpty() || servers.isEmpty()) { + return null; + } + + Map> ret; + ret = new HashMap>(); + for (HRegionInfo info : regions) { + // Find available servers from parameter list; + List availibleServer = manager.getAvailableServer(servers, + info.getTableNameAsString()); + int lightestLoad = Integer.MAX_VALUE; + List leastLoad = null; + + // find the least load server to assign this region + for (ServerName server : availibleServer) { + if (ret.get(server) == null) { + leastLoad = new ArrayList(); + ret.put(server, leastLoad); + break; + } else if (ret.get(server).size() < lightestLoad) { + leastLoad = ret.get(server); + } + } + if (leastLoad == null) { + LOG.error("There is no available server for region " + info); + } else { + leastLoad.add(info); + } + } + return ret; + } + + /** + * Balance the cluster.Acturally balance each group respectively + */ + @Override + public List balanceCluster( + Map> clusterState) { + List ret = new ArrayList(); + for (GroupInfo group : manager.getGroups().values()) { + HashSet serverSet = group.getServers(); + Map> groupClusterState; + groupClusterState = new TreeMap>(); + for (ServerName name : clusterState.keySet()) { + if (serverSet.contains(name)) { + groupClusterState.put(name, clusterState.get(name)); + } + } + List plans = super.balanceCluster(groupClusterState); + if (plans != null) { + ret.addAll(plans); + } + } + return ret; + } + + /** + * Do a retain assignment,if the exiting plan is not legal according to + * region's group information,the region will be random assigned to a + * available server of its group. + */ + public Map> retainAssignment( + Map regions, List servers) { + Map> assignments; + assignments = new TreeMap>(); + for (ServerName server : servers) { + assignments.put(server, new ArrayList()); + } + for (Map.Entry region : regions.entrySet()) { + ServerName server = region.getValue(); + //Check the assignment + if (server != null && manager.inSameGroup(region.getKey(), server)) { + assignments.get(server).add(region.getKey()); + } else { + List avaServers = manager.getAvailableServer(region + .getKey().getTableNameAsString()); + assignments.get(avaServers.get(RANDOM.nextInt(avaServers.size()))).add( + region.getKey()); + } + } + return assignments; + } + + /** + * Generates an immediate assignment plan to be used by a new master for + * regions in transition that do not have an already known destination. + *

+ * Takes a list of regions that need immediate assignment and a list of all + * available servers. Returns a map of regions to the server they should be + * assigned to. + *

+ * This method will return quickly and does not do any intelligent + * balancing. The goal is to make a fast decision not the best decision + * possible. + *

+ * This method will check region's group attribute and assign them according + * to group information. + *

+ * Currently this is random. + * + * @param regions + * @param servers + * @return map of regions to the server it should be assigned to + */ + + @Override + public Map immediateAssignment( + List regions, List servers) { + Map assignments = new TreeMap(); + for (HRegionInfo region : regions) { + List availibleServer = manager.getAvailableServer(servers, + region.getTableNameAsString()); + assignments + .put(region, availibleServer.get(RANDOM.nextInt(availibleServer.size()))); + } + return assignments; + } +}