diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java index 2b132dd..7e66c27 100644 --- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java +++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java @@ -209,8 +209,14 @@ public class RSGroupAdminEndpoint extends RSGroupAdminService for(HBaseProtos.ServerName el: request.getServersList()) { servers.add(Address.fromParts(el.getHostName(), el.getPort())); } + if (master.getMasterCoprocessorHost() != null) { + master.getMasterCoprocessorHost().preMoveServers(servers, request.getTargetGroup()); + } checkPermission("moveServers"); groupAdminServer.moveServers(servers, request.getTargetGroup()); + if (master.getMasterCoprocessorHost() != null) { + master.getMasterCoprocessorHost().postMoveServers(servers, request.getTargetGroup()); + } response = builder.build(); } catch (IOException e) { ResponseConverter.setControllerException(controller, e); @@ -230,9 +236,15 @@ public class RSGroupAdminEndpoint extends RSGroupAdminService for(TableProtos.TableName tableName: request.getTableNameList()) { tables.add(ProtobufUtil.toTableName(tableName)); } + if (master.getMasterCoprocessorHost() != null) { + master.getMasterCoprocessorHost().preMoveTables(tables, request.getTargetGroup()); + } checkPermission("moveTables"); groupAdminServer.moveTables(tables, request.getTargetGroup()); response = builder.build(); + if (master.getMasterCoprocessorHost() != null) { + master.getMasterCoprocessorHost().postMoveTables(tables, request.getTargetGroup()); + } } catch (IOException e) { ResponseConverter.setControllerException(controller, e); } @@ -252,8 +264,16 @@ public class RSGroupAdminEndpoint extends RSGroupAdminService for (TableProtos.TableName tableName : request.getTableNameList()) { tables.add(ProtobufUtil.toTableName(tableName)); } + if (master.getMasterCoprocessorHost() != null) { + master.getMasterCoprocessorHost().preMoveServersAndTables(servers, tables, + request.getTargetGroup()); + } checkPermission("moveServersAndTables"); groupAdminServer.moveServersAndTables(servers, tables, request.getTargetGroup()); + if (master.getMasterCoprocessorHost() != null) { + master.getMasterCoprocessorHost().postMoveServersAndTables(servers, tables, + request.getTargetGroup()); + } } catch (IOException e) { ResponseConverter.setControllerException(controller, e); } @@ -268,9 +288,15 @@ public class RSGroupAdminEndpoint extends RSGroupAdminService try { AddRSGroupResponse.Builder builder = AddRSGroupResponse.newBuilder(); + if (master.getMasterCoprocessorHost() != null) { + master.getMasterCoprocessorHost().preAddRSGroup(request.getRSGroupName()); + } checkPermission("addRSGroup"); groupAdminServer.addRSGroup(request.getRSGroupName()); response = builder.build(); + if (master.getMasterCoprocessorHost() != null) { + master.getMasterCoprocessorHost().postAddRSGroup(request.getRSGroupName()); + } } catch (IOException e) { ResponseConverter.setControllerException(controller, e); } @@ -285,8 +311,14 @@ public class RSGroupAdminEndpoint extends RSGroupAdminService try { RemoveRSGroupResponse.Builder builder = RemoveRSGroupResponse.newBuilder(); + if (master.getMasterCoprocessorHost() != null) { + master.getMasterCoprocessorHost().preRemoveRSGroup(request.getRSGroupName()); + } checkPermission("removeRSGroup"); groupAdminServer.removeRSGroup(request.getRSGroupName()); + if (master.getMasterCoprocessorHost() != null) { + master.getMasterCoprocessorHost().postRemoveRSGroup(request.getRSGroupName()); + } response = builder.build(); } catch (IOException e) { ResponseConverter.setControllerException(controller, e); @@ -300,8 +332,16 @@ public class RSGroupAdminEndpoint extends RSGroupAdminService RpcCallback done) { BalanceRSGroupResponse.Builder builder = BalanceRSGroupResponse.newBuilder(); try { + if (master.getMasterCoprocessorHost() != null) { + master.getMasterCoprocessorHost().preBalanceRSGroup(request.getRSGroupName()); + } checkPermission("balanceRSGroup"); - builder.setBalanceRan(groupAdminServer.balanceRSGroup(request.getRSGroupName())); + boolean balancerRan = groupAdminServer.balanceRSGroup(request.getRSGroupName()); + builder.setBalanceRan(balancerRan); + if (master.getMasterCoprocessorHost() != null) { + master.getMasterCoprocessorHost().postBalanceRSGroup(request.getRSGroupName(), + balancerRan); + } } catch (IOException e) { ResponseConverter.setControllerException(controller, e); builder.setBalanceRan(false); @@ -358,8 +398,14 @@ public class RSGroupAdminEndpoint extends RSGroupAdminService for (HBaseProtos.ServerName el : request.getServersList()) { servers.add(Address.fromParts(el.getHostName(), el.getPort())); } + if (master.getMasterCoprocessorHost() != null) { + master.getMasterCoprocessorHost().preRemoveServers(servers); + } checkPermission("removeServers"); groupAdminServer.removeServers(servers); + if (master.getMasterCoprocessorHost() != null) { + master.getMasterCoprocessorHost().postRemoveServers(servers); + } } catch (IOException e) { ResponseConverter.setControllerException(controller, e); } diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java index e4f4c86..a2d3589 100644 --- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java +++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java @@ -112,9 +112,6 @@ public class RSGroupAdminServer implements RSGroupAdmin { RSGroupInfoManager manager = getRSGroupInfoManager(); synchronized (manager) { - if (master.getMasterCoprocessorHost() != null) { - master.getMasterCoprocessorHost().preMoveServers(servers, targetGroupName); - } Address firstServer = servers.iterator().next(); //we only allow a move from a single source group //so this should be ok @@ -235,9 +232,6 @@ public class RSGroupAdminServer implements RSGroupAdmin { serversInTransition.remove(server); } } - if (master.getMasterCoprocessorHost() != null) { - master.getMasterCoprocessorHost().postMoveServers(servers, targetGroupName); - } LOG.info("Move server done: "+sourceGroupName+"->"+targetGroupName); } } @@ -254,10 +248,6 @@ public class RSGroupAdminServer implements RSGroupAdmin { } RSGroupInfoManager manager = getRSGroupInfoManager(); synchronized (manager) { - if (master.getMasterCoprocessorHost() != null) { - master.getMasterCoprocessorHost().preMoveTables(tables, targetGroup); - } - if(targetGroup != null) { RSGroupInfo destGroup = manager.getRSGroup(targetGroup); if(destGroup == null) { @@ -276,9 +266,6 @@ public class RSGroupAdminServer implements RSGroupAdmin { } } manager.moveTables(tables, targetGroup); - if (master.getMasterCoprocessorHost() != null) { - master.getMasterCoprocessorHost().postMoveTables(tables, targetGroup); - } } for(TableName table: tables) { if (master.getAssignmentManager().getTableStateManager().isTableState(table, @@ -302,22 +289,13 @@ public class RSGroupAdminServer implements RSGroupAdmin { @Override public void addRSGroup(String name) throws IOException { - if (master.getMasterCoprocessorHost() != null) { - master.getMasterCoprocessorHost().preAddRSGroup(name); - } getRSGroupInfoManager().addRSGroup(new RSGroupInfo(name)); - if (master.getMasterCoprocessorHost() != null) { - master.getMasterCoprocessorHost().postAddRSGroup(name); - } } @Override public void removeRSGroup(String name) throws IOException { RSGroupInfoManager manager = getRSGroupInfoManager(); synchronized (manager) { - if (master.getMasterCoprocessorHost() != null) { - master.getMasterCoprocessorHost().preRemoveRSGroup(name); - } RSGroupInfo groupInfo = getRSGroupInfoManager().getRSGroup(name); if(groupInfo == null) { throw new ConstraintException("Group "+name+" does not exist"); @@ -338,9 +316,6 @@ public class RSGroupAdminServer implements RSGroupAdmin { } } manager.removeRSGroup(name); - if (master.getMasterCoprocessorHost() != null) { - master.getMasterCoprocessorHost().postRemoveRSGroup(name); - } } } @@ -352,9 +327,6 @@ public class RSGroupAdminServer implements RSGroupAdmin { boolean balancerRan; synchronized (balancer) { - if (master.getMasterCoprocessorHost() != null) { - master.getMasterCoprocessorHost().preBalanceRSGroup(groupName); - } if (getRSGroupInfo(groupName) == null) { throw new ConstraintException("Group does not exist: "+groupName); } @@ -397,9 +369,6 @@ public class RSGroupAdminServer implements RSGroupAdmin { LOG.info("Group balance "+groupName+" completed after "+ (System.currentTimeMillis()-startTime)+" seconds"); } - if (master.getMasterCoprocessorHost() != null) { - master.getMasterCoprocessorHost().postBalanceRSGroup(groupName, balancerRan); - } } return balancerRan; } @@ -428,15 +397,9 @@ public class RSGroupAdminServer implements RSGroupAdmin { // Hold a lock on the manager instance while moving servers to prevent // another writer changing our state while we are working. synchronized (rsGroupInfoManager) { - if (master.getMasterCoprocessorHost() != null) { - master.getMasterCoprocessorHost().preRemoveServers(servers); - } //check the set of servers checkForDeadOrOnlineServers(servers); rsGroupInfoManager.removeServers(servers); - if (master.getMasterCoprocessorHost() != null) { - master.getMasterCoprocessorHost().postRemoveServers(servers); - } LOG.info("Remove decommissioned servers " + servers + " from rsgroup done."); } } diff --git a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroups.java b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroups.java index 081c0a3..4320fea 100644 --- a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroups.java +++ b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroups.java @@ -19,7 +19,14 @@ */ package org.apache.hadoop.hbase.rsgroup; -import com.google.common.collect.Sets; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -33,8 +40,13 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.Waiter.Predicate; +import org.apache.hadoop.hbase.coprocessor.BaseMasterObserver; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.MasterObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.ServerManager; import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; import org.apache.hadoop.hbase.net.Address; @@ -50,13 +62,7 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; -import java.io.IOException; -import java.util.Iterator; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import com.google.common.collect.Sets; @Category({MediumTests.class}) public class TestRSGroups extends TestRSGroupsBase { @@ -64,7 +70,7 @@ public class TestRSGroups extends TestRSGroupsBase { private static HMaster master; private static boolean init = false; private static RSGroupAdminEndpoint RSGroupAdminEndpoint; - + private static CPMasterObserver observer; @BeforeClass public static void setUp() throws Exception { @@ -75,7 +81,7 @@ public class TestRSGroups extends TestRSGroupsBase { HConstants.HBASE_MASTER_LOADBALANCER_CLASS, RSGroupBasedLoadBalancer.class.getName()); TEST_UTIL.getConfiguration().set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, - RSGroupAdminEndpoint.class.getName()); + RSGroupAdminEndpoint.class.getName() + "," + CPMasterObserver.class.getName()); TEST_UTIL.getConfiguration().setBoolean( HConstants.ZOOKEEPER_USEMULTI, true); @@ -100,8 +106,10 @@ public class TestRSGroups extends TestRSGroupsBase { admin.setBalancerRunning(false,true); rsGroupAdmin = new VerifyingRSGroupAdminClient(new RSGroupAdminClient(TEST_UTIL.getConnection()), TEST_UTIL.getConfiguration()); + MasterCoprocessorHost host = master.getMasterCoprocessorHost(); + observer = (CPMasterObserver) host.findCoprocessor(CPMasterObserver.class.getName()); RSGroupAdminEndpoint = - master.getMasterCoprocessorHost().findCoprocessors(RSGroupAdminEndpoint.class).get(0); + host.findCoprocessors(RSGroupAdminEndpoint.class).get(0); } @AfterClass @@ -141,6 +149,7 @@ public class TestRSGroups extends TestRSGroupsBase { } catch (Exception ex) { // ignore } + assertTrue(observer.preMoveServersCalled); TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate() { @Override public boolean evaluate() throws Exception { @@ -213,6 +222,9 @@ public class TestRSGroups extends TestRSGroupsBase { String groupName = tablePrefix+"_foo"; LOG.info("testNamespaceConstraint"); rsGroupAdmin.addRSGroup(groupName); + assertTrue(observer.preAddRSGroupCalled); + assertTrue(observer.postAddRSGroupCalled); + admin.createNamespace(NamespaceDescriptor.create(nsName) .addConfiguration(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP, groupName) .build()); @@ -233,6 +245,8 @@ public class TestRSGroups extends TestRSGroupsBase { //test add non-existent group admin.deleteNamespace(nsName); rsGroupAdmin.removeRSGroup(groupName); + assertTrue(observer.preRemoveRSGroupCalled); + assertTrue(observer.postRemoveRSGroupCalled); try { admin.createNamespace(NamespaceDescriptor.create(nsName) .addConfiguration(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP, "foo") @@ -253,6 +267,122 @@ public class TestRSGroups extends TestRSGroupsBase { it.next(); } + public static class CPMasterObserver extends BaseMasterObserver { + boolean preBalanceRSGroupCalled = false; + boolean postBalanceRSGroupCalled = false; + boolean preMoveServersCalled = false; + boolean postMoveServersCalled = false; + boolean preMoveTablesCalled = false; + boolean postMoveTablesCalled = false; + boolean preAddRSGroupCalled = false; + boolean postAddRSGroupCalled = false; + boolean preRemoveRSGroupCalled = false; + boolean postRemoveRSGroupCalled = false; + boolean preRemoveServersCalled = false; + boolean postRemoveServersCalled = false; + boolean preMoveServersAndTables = false; + boolean postMoveServersAndTables = false; + + @Override + public void preMoveServersAndTables(final ObserverContext ctx, + Set
servers, Set tables, String targetGroup) throws IOException { + preMoveServersAndTables = true; + } + @Override + public void postMoveServersAndTables(final ObserverContext ctx, + Set
servers, Set tables, String targetGroup) throws IOException { + postMoveServersAndTables = true; + } + @Override + public void preRemoveServers( + final ObserverContext ctx, + Set
servers) throws IOException { + preRemoveServersCalled = true; + } + @Override + public void postRemoveServers( + final ObserverContext ctx, + Set
servers) throws IOException { + postRemoveServersCalled = true; + } + @Override + public void preRemoveRSGroup(final ObserverContext ctx, + String name) throws IOException { + preRemoveRSGroupCalled = true; + } + @Override + public void postRemoveRSGroup(final ObserverContext ctx, + String name) throws IOException { + postRemoveRSGroupCalled = true; + } + @Override + public void preAddRSGroup(final ObserverContext ctx, + String name) throws IOException { + preAddRSGroupCalled = true; + } + @Override + public void postAddRSGroup(final ObserverContext ctx, + String name) throws IOException { + postAddRSGroupCalled = true; + } + @Override + public void preMoveTables(final ObserverContext ctx, + Set tables, String targetGroup) throws IOException { + preMoveTablesCalled = true; + } + @Override + public void postMoveTables(final ObserverContext ctx, + Set tables, String targetGroup) throws IOException { + postMoveTablesCalled = true; + } + @Override + public void preMoveServers(final ObserverContext ctx, + Set
servers, String targetGroup) throws IOException { + preMoveServersCalled = true; + } + + @Override + public void postMoveServers(final ObserverContext ctx, + Set
servers, String targetGroup) throws IOException { + postMoveServersCalled = true; + } + @Override + public void preBalanceRSGroup(final ObserverContext ctx, + String groupName) throws IOException { + preBalanceRSGroupCalled = true; + } + @Override + public void postBalanceRSGroup(final ObserverContext ctx, + String groupName, boolean balancerRan) throws IOException { + postBalanceRSGroupCalled = true; + } + } + @Test + public void testMoveServersAndTables() throws Exception { + super.testMoveServersAndTables(); + assertTrue(observer.preMoveServersAndTables); + assertTrue(observer.postMoveServersAndTables); + } + @Test + public void testTableMoveTruncateAndDrop() throws Exception { + super.testTableMoveTruncateAndDrop(); + assertTrue(observer.preMoveTablesCalled); + assertTrue(observer.postMoveTablesCalled); + } + + @Test + public void testRemoveServers() throws Exception { + super.testRemoveServers(); + assertTrue(observer.preRemoveServersCalled); + } + + @Test + public void testGroupBalance() throws Exception { + super.testGroupBalance(); + assertTrue(observer.preBalanceRSGroupCalled); + assertTrue(observer.postBalanceRSGroupCalled); + } + @Test public void testMisplacedRegions() throws Exception { final TableName tableName = TableName.valueOf(tablePrefix+"_testMisplacedRegions");