From f40296afd66367be711f1687124b952575fe26cb Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Mon, 11 Dec 2017 22:19:31 -0800 Subject: [PATCH] HBASE-18946 Stochastic load balancer assigns replica regions to the same RS Changed core of AM#createAssignProcedure so we pass list of Regions to assign to the balancer en masse, in one lump. Let the balancer figure what to do with the fat assign. We get back a Map of servers to regions. We then transform that into an array of AssignProcedures to pass to the Assign executor. We sort the array so that meta and system tables are passed to the executor first (and so replicas are clumped together...). Internally the AM executor may divvy up the work into queues but all will be pre-assigned so we should have good distribution (round-robin) regardless of how the queue is processed. M hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignProcedure.java Cleanup around forceNewPlan. Was confusing. Added a Comparator to sort AssignProcedures so meta and system tables come ahead of user-space tables. M hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java Remove the forceNewPlan argument on createAssignProcedure. Didn't make sense given we were creating a new AssignProcedure; the arg had no effect. (createAssignProcedures) Recast to feed all regions to the balancer in bulk and to sort the return so meta and system tables take precedence. --- .../hadoop/hbase/procedure2/ProcedureExecutor.java | 3 + .../hadoop/hbase/master/MasterRpcServices.java | 2 +- .../hbase/master/assignment/AssignProcedure.java | 46 +++++++++-- .../hbase/master/assignment/AssignmentManager.java | 96 ++++++++++++---------- .../master/assignment/MoveRegionProcedure.java | 4 +- .../assignment/RegionTransitionProcedure.java | 7 +- .../master/procedure/RecoverMetaProcedure.java | 2 +- .../master/procedure/ServerCrashProcedure.java | 3 +- .../hadoop/hbase/regionserver/wal/AsyncFSWAL.java | 2 +- .../org/apache/hadoop/hbase/wal/WALFactory.java | 2 +- .../master/assignment/TestAssignmentManager.java | 24 +++--- .../hbase/master/snapshot/TestAssignProcedure.java | 91 ++++++++++++++++++++ .../TestRegionMergeTransactionOnCluster.java | 2 +- .../TestSplitTransactionOnCluster.java | 2 +- 14 files changed, 210 insertions(+), 76 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/snapshot/TestAssignProcedure.java diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index 976ad79c68..ac0487165e 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -880,6 +880,9 @@ public class ProcedureExecutor { public void submitProcedures(final Procedure[] procs) { Preconditions.checkArgument(lastProcId.get() >= 0); Preconditions.checkArgument(isRunning(), "executor not running"); + if (procs == null || procs.length <= 0) { + return; + } // Prepare procedure for (int i = 0; i < procs.length; ++i) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index ce85b66cb4..10f5299e6d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -513,7 +513,7 @@ public class MasterRpcServices extends RSRpcServices master.cpHost.preAssign(regionInfo); } LOG.info(master.getClientIdAuditPrefix() + " assign " + regionInfo.getRegionNameAsString()); - master.getAssignmentManager().assign(regionInfo, true); + master.getAssignmentManager().assign(regionInfo); if (master.cpHost != null) { master.cpHost.postAssign(regionInfo); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignProcedure.java index 33e04fb9e6..5555062f3b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignProcedure.java @@ -1,4 +1,4 @@ -/** +/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.master.assignment; import java.io.IOException; +import java.util.Comparator; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -73,6 +74,9 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto public class AssignProcedure extends RegionTransitionProcedure { private static final Log LOG = LogFactory.getLog(AssignProcedure.class); + /** + * Set to true when we need recalibrate -- choose a new target -- because original assign failed. + */ private boolean forceNewPlan = false; /** @@ -84,24 +88,24 @@ public class AssignProcedure extends RegionTransitionProcedure { */ protected volatile ServerName targetServer; + /** + * Comparator that will sort AssignProcedures so meta assigns come first, then system table + * assigns and finally user space assigns. + */ + public static final CompareAssignProcedure COMPARATOR = new CompareAssignProcedure(); + public AssignProcedure() { // Required by the Procedure framework to create the procedure on replay super(); } public AssignProcedure(final RegionInfo regionInfo) { - this(regionInfo, false); - } - - public AssignProcedure(final RegionInfo regionInfo, final boolean forceNewPlan) { super(regionInfo); - this.forceNewPlan = forceNewPlan; this.targetServer = null; } public AssignProcedure(final RegionInfo regionInfo, final ServerName destinationServer) { super(regionInfo); - this.forceNewPlan = false; this.targetServer = destinationServer; } @@ -361,4 +365,32 @@ public class AssignProcedure extends RegionTransitionProcedure { protected ProcedureMetrics getProcedureMetrics(MasterProcedureEnv env) { return env.getAssignmentManager().getAssignmentManagerMetrics().getAssignProcMetrics(); } + + /** + * Sort AssignProcedures such that meta and system assigns come first before user-space assigns. + * Have to do it this way w/ distinct Comparator because Procedure is already Comparable on + * 'Env'(?). + */ + public static class CompareAssignProcedure implements Comparator { + @Override + public int compare(AssignProcedure left, AssignProcedure right) { + if (left.getRegionInfo().isMetaRegion()) { + if (right.getRegionInfo().isMetaRegion()) { + return RegionInfo.COMPARATOR.compare(left.getRegionInfo(), right.getRegionInfo()); + } + return -1; + } else if (left.getRegionInfo().isMetaRegion()) { + return +1; + } + if (left.getRegionInfo().getTable().isSystemTable()) { + if (right.getRegionInfo().getTable().isSystemTable()) { + return RegionInfo.COMPARATOR.compare(left.getRegionInfo(), right.getRegionInfo()); + } + return -1; + } else if (left.getRegionInfo().getTable().isSystemTable()) { + return +1; + } + return RegionInfo.COMPARATOR.compare(left.getRegionInfo(), right.getRegionInfo()); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java index cebe0b0465..54d0206879 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java @@ -461,7 +461,7 @@ public class AssignmentManager implements ServerListener { proc = createAssignProcedure(metaRegionInfo, serverName); } else { LOG.debug("Assigning " + metaRegionInfo.getRegionNameAsString()); - proc = createAssignProcedure(metaRegionInfo, false); + proc = createAssignProcedure(metaRegionInfo); } ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc); } @@ -523,11 +523,7 @@ public class AssignmentManager implements ServerListener { } public void assign(final RegionInfo regionInfo) throws IOException { - assign(regionInfo, true); - } - - public void assign(final RegionInfo regionInfo, final boolean forceNewPlan) throws IOException { - AssignProcedure proc = createAssignProcedure(regionInfo, forceNewPlan); + AssignProcedure proc = createAssignProcedure(regionInfo); ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc); } @@ -602,19 +598,53 @@ public class AssignmentManager implements ServerListener { // RegionTransition procedures helpers // ============================================================================================ - public AssignProcedure[] createAssignProcedures(final Collection regionInfo) { - return createAssignProcedures(regionInfo, false); + public AssignProcedure[] createAssignProcedures(final List hris) { + if (hris.isEmpty()) { + return null; + } + try { + // Ask the balancer to assign our regions. Pass the regions en masse. The balancer can do + // a better job if it has all the assignments in the one lump. + Map> assignments = getBalancer().roundRobinAssignment(hris, + this.master.getServerManager().createDestinationServersList(null)); + return createAssignProcedure(assignments, hris.size()); + } catch (HBaseIOException hioe) { + LOG.warn("Failed roundRobinAssignment", hioe); + } + // Fall through here if the above 'bulk' balancer call failed. This is last resort if above + // attempt at bulk assigned failed for whatever reason. + int index = 0; + AssignProcedure [] procedures = new AssignProcedure[hris.size()]; + for (RegionInfo hri : hris) { + procedures[index++] = createAssignProcedure(hri); + } + return procedures; } - public AssignProcedure[] createAssignProcedures(final Collection regionInfo, - final boolean forceNewPlan) { - if (regionInfo.isEmpty()) return null; - final AssignProcedure[] procs = new AssignProcedure[regionInfo.size()]; - int index = 0; - for (RegionInfo hri: regionInfo) { - procs[index++] = createAssignProcedure(hri, forceNewPlan); + // Make this static for the method below where we use it typing the AssignProcedure array we + // return as result. + private static final AssignProcedure [] ASSIGN_PROCEDURE_ARRAY_TYPE = new AssignProcedure[] {}; + + /** + * @param assignments Map of assignments from which we produce an array of assignments. + * @param size Count of assignments to make (the caller may know the total count) + * @return Assignments made from the passed in assignments + */ + AssignProcedure[] createAssignProcedure(Map> assignments, int size) { + List procedures = + new ArrayList(size > 0? size: 8/*Choose an arbitrary size*/); + for (Map.Entry> e: assignments.entrySet()) { + for (RegionInfo ri: e.getValue()) { + AssignProcedure ap = createAssignProcedure(ri, e.getKey()); + ap.setOwner(getProcedureEnvironment().getRequestUser().getShortName()); + procedures.add(ap); + } } - return procs; + if (procedures.size() > 0) { + // Sort the procedures so meta and system regions are first in the returned array. + procedures.sort(AssignProcedure.COMPARATOR); + } + return procedures.toArray(ASSIGN_PROCEDURE_ARRAY_TYPE); } // Needed for the following method so it can type the created Array we return @@ -669,9 +699,8 @@ public class AssignmentManager implements ServerListener { return createReopenProcedures(regionStates.getRegionsOfTable(tableName)); } - public AssignProcedure createAssignProcedure(final RegionInfo regionInfo, - final boolean forceNewPlan) { - AssignProcedure proc = new AssignProcedure(regionInfo, forceNewPlan); + public AssignProcedure createAssignProcedure(final RegionInfo regionInfo) { + AssignProcedure proc = new AssignProcedure(regionInfo); proc.setOwner(getProcedureEnvironment().getRequestUser().getShortName()); return proc; } @@ -1257,12 +1286,10 @@ public class AssignmentManager implements ServerListener { setFailoverCleanupDone(true); - // assign offline regions + // Assign offline regions st = System.currentTimeMillis(); - for (RegionInfo regionInfo: getOrderedRegions(regionsToAssign)) { - master.getMasterProcedureExecutor().submitProcedure( - createAssignProcedure(regionInfo, false)); - } + master.getMasterProcedureExecutor().submitProcedures(master.getAssignmentManager(). + createAssignProcedures(regionsToAssign)); et = System.currentTimeMillis(); LOG.info("[STEP-3] " + StringUtils.humanTimeDiff(et - st)); @@ -1366,27 +1393,6 @@ public class AssignmentManager implements ServerListener { return new Pair(ritCount, states.size()); } - /** - * Used when assign regions, this method will put system regions in - * front of user regions - * @param regions - * @return A list of regions with system regions at front - */ - public List getOrderedRegions( - final List regions) { - if (regions == null) return Collections.emptyList(); - - List systemList = new ArrayList<>(); - List userList = new ArrayList<>(); - for (RegionInfo hri : regions) { - if (hri.getTable().isSystemTable()) systemList.add(hri); - else userList.add(hri); - } - // Append userList to systemList - systemList.addAll(userList); - return systemList; - } - // ============================================================================================ // TODO: Region State In Transition // ============================================================================================ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MoveRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MoveRegionProcedure.java index 4caed2895d..5940f2fe31 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MoveRegionProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MoveRegionProcedure.java @@ -1,4 +1,4 @@ -/** +/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -71,7 +71,7 @@ public class MoveRegionProcedure extends AbstractStateMachineRegionProcedure futureA = submitProcedure(am.createAssignProcedure(hri, false)); + final Future futureA = submitProcedure(am.createAssignProcedure(hri)); // wait first assign waitOnFuture(futureA); @@ -396,7 +396,7 @@ public class TestAssignmentManager { // Second should be a noop. We should recognize region is already OPEN internally // and skip out doing nothing. // wait second assign - final Future futureB = submitProcedure(am.createAssignProcedure(hri, false)); + final Future futureB = submitProcedure(am.createAssignProcedure(hri)); waitOnFuture(futureB); am.getRegionStates().isRegionInState(hri, State.OPEN); // TODO: What else can we do to ensure just a noop. @@ -419,7 +419,7 @@ public class TestAssignmentManager { rsDispatcher.setMockRsExecutor(new GoodRsExecutor()); // assign the region first - waitOnFuture(submitProcedure(am.createAssignProcedure(hri, false))); + waitOnFuture(submitProcedure(am.createAssignProcedure(hri))); final Future futureA = submitProcedure(am.createUnassignProcedure(hri, null, false)); @@ -488,7 +488,7 @@ public class TestAssignmentManager { private AssignProcedure createAndSubmitAssign(TableName tableName, int regionId) { RegionInfo hri = createRegionInfo(tableName, regionId); - AssignProcedure proc = am.createAssignProcedure(hri, false); + AssignProcedure proc = am.createAssignProcedure(hri); master.getMasterProcedureExecutor().submitProcedure(proc); return proc; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/snapshot/TestAssignProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/snapshot/TestAssignProcedure.java new file mode 100644 index 0000000000..2e232d3483 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/snapshot/TestAssignProcedure.java @@ -0,0 +1,91 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.master.snapshot; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static junit.framework.TestCase.assertTrue; + +import org.apache.hadoop.hbase.CategoryBasedTimeout; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.master.assignment.AssignProcedure; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.junit.rules.TestRule; + + +@Category({RegionServerTests.class, SmallTests.class}) +public class TestAssignProcedure { + @Rule public TestName name = new TestName(); + @Rule public final TestRule timeout = CategoryBasedTimeout.builder(). + withTimeout(this.getClass()). + withLookingForStuckThread(true). + build(); + + @Test + public void testSimpleComparator() { + List procedures = new ArrayList(); + RegionInfo user1 = RegionInfoBuilder.newBuilder(TableName.valueOf("user_space1")).build(); + procedures.add(new AssignProcedure(user1)); + RegionInfo user2 = RegionInfoBuilder.newBuilder(TableName.valueOf("user_space2")).build(); + procedures.add(new AssignProcedure(RegionInfoBuilder.FIRST_META_REGIONINFO)); + procedures.add(new AssignProcedure(user2)); + RegionInfo system = RegionInfoBuilder.newBuilder(TableName.NAMESPACE_TABLE_NAME).build(); + procedures.add(new AssignProcedure(system)); + procedures.sort(AssignProcedure.COMPARATOR); + assertTrue(procedures.get(0).isMeta()); + assertTrue(procedures.get(1).getRegionInfo().getTable().equals(TableName.NAMESPACE_TABLE_NAME)); + } + + @Test + public void testComparatorWithMetas() { + List procedures = new ArrayList(); + RegionInfo user1 = RegionInfoBuilder.newBuilder(TableName.valueOf("user_space1")).build(); + procedures.add(new AssignProcedure(user1)); + RegionInfo meta2 = RegionInfoBuilder.newBuilder(TableName.META_TABLE_NAME). + setStartKey(Bytes.toBytes("002")).build(); + procedures.add(new AssignProcedure(meta2)); + RegionInfo meta1 = RegionInfoBuilder.newBuilder(TableName.META_TABLE_NAME). + setStartKey(Bytes.toBytes("001")).build(); + procedures.add(new AssignProcedure(meta1)); + procedures.add(new AssignProcedure(RegionInfoBuilder.FIRST_META_REGIONINFO)); + RegionInfo meta0 = RegionInfoBuilder.newBuilder(TableName.META_TABLE_NAME). + setStartKey(Bytes.toBytes("000")).build(); + procedures.add(new AssignProcedure(meta0)); + RegionInfo user2 = RegionInfoBuilder.newBuilder(TableName.valueOf("user_space2")).build(); + procedures.add(new AssignProcedure(user2)); + RegionInfo system = RegionInfoBuilder.newBuilder(TableName.NAMESPACE_TABLE_NAME).build(); + procedures.add(new AssignProcedure(system)); + procedures.sort(AssignProcedure.COMPARATOR); + assertTrue(procedures.get(0).getRegionInfo().equals(RegionInfoBuilder.FIRST_META_REGIONINFO)); + assertTrue(procedures.get(1).getRegionInfo().equals(meta0)); + assertTrue(procedures.get(2).getRegionInfo().equals(meta1)); + assertTrue(procedures.get(3).getRegionInfo().equals(meta2)); + assertTrue(procedures.get(4).getRegionInfo().getTable().equals(TableName.NAMESPACE_TABLE_NAME)); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java index 035fb9e7f0..ede9764fd8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java @@ -153,7 +153,7 @@ public class TestRegionMergeTransactionOnCluster { RegionStates regionStates = am.getRegionStates(); // We should not be able to assign it again - am.assign(hri, true); + am.assign(hri); assertFalse("Merged region can't be assigned", regionStates.isRegionInTransition(hri)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java index 8924454ee4..92833fde61 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java @@ -716,7 +716,7 @@ public class TestSplitTransactionOnCluster { assertTrue(regionStates.isRegionInState(daughters.get(1).getRegionInfo(), State.OPEN)); // We should not be able to assign it again - am.assign(hri, true); + am.assign(hri); assertFalse("Split region can't be assigned", regionStates.isRegionInTransition(hri)); assertTrue(regionStates.isRegionInState(hri, State.SPLIT)); -- 2.11.0 (Apple Git-81)