diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index bf9b207..ccd9b67 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -214,6 +214,7 @@ public class AssignmentManager extends ZooKeeperListener { // bulk assigning may be not as efficient. private final int bulkAssignThresholdRegions; private final int bulkAssignThresholdServers; + private final int bulkPerRegionOpenTimeGuesstimate; // Should bulk assignment wait till all regions are assigned, // or it is timed out? This is useful to measure bulk assignment @@ -255,7 +256,7 @@ public class AssignmentManager extends ZooKeeperListener { /** Listeners that are called on assignment events. */ private List listeners = new CopyOnWriteArrayList(); - + private RegionStateListener regionStateListener; /** @@ -312,6 +313,8 @@ public class AssignmentManager extends ZooKeeperListener { conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false); this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7); this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3); + this.bulkPerRegionOpenTimeGuesstimate = + conf.getInt("hbase.bulk.assignment.perregion.open.time", 10000); int workers = conf.getInt("hbase.assignment.zkevent.workers", 20); ThreadFactory threadFactory = Threads.newDaemonThreadFactory("AM.ZK.Worker"); @@ -1788,6 +1791,18 @@ public class AssignmentManager extends ZooKeeperListener { } } } + + // wait for assignment completion + ArrayList userRegionSet = new ArrayList(regions.size()); + for (HRegionInfo region: regions) { + if (!region.getTable().isSystemTable()) { + userRegionSet.add(region); + } + } + if (!waitForAssignment(userRegionSet, true, userRegionSet.size(), + System.currentTimeMillis())) { + LOG.debug("some user regions are still in transition: " + userRegionSet); + } LOG.debug("Bulk assigning done for " + destination); return true; } finally { @@ -2621,18 +2636,56 @@ public class AssignmentManager extends ZooKeeperListener { */ public boolean waitForAssignment(HRegionInfo regionInfo) throws InterruptedException { - while (!regionStates.isRegionOnline(regionInfo)) { - if (regionStates.isRegionInState(regionInfo, State.FAILED_OPEN) - || this.server.isStopped()) { - return false; - } + ArrayList regionSet = new ArrayList(1); + regionSet.add(regionInfo); + return waitForAssignment(regionSet, true, Long.MAX_VALUE); + } - // We should receive a notification, but it's - // better to have a timeout to recheck the condition here: - // it lowers the impact of a race condition if any - regionStates.waitForUpdate(100); + /** + * Waits until the specified region has completed assignment, or the deadline is reached. + */ + protected boolean waitForAssignment(final Collection regionSet, + final boolean waitTillAllAssigned, final int reassigningRegions, + final long minEndTime) throws InterruptedException { + long deadline = minEndTime + bulkPerRegionOpenTimeGuesstimate * (reassigningRegions + 1); + return waitForAssignment(regionSet, waitTillAllAssigned, deadline); + } + + /** + * Waits until the specified region has completed assignment, or the deadline is reached. + * @param regionSet set of region to wait on. the set is modified and the assigned regions removed + * @param waitTillAllAssigned true if we should wait all the regions to be assigned + * @param deadline the timestamp after which the wait is aborted + * @return true if all the regions are assigned false otherwise. + */ + protected boolean waitForAssignment(final Collection regionSet, + final boolean waitTillAllAssigned, final long deadline) throws InterruptedException { + // We're not synchronizing on regionsInTransition now because we don't use any iterator. + while (!regionSet.isEmpty() && !server.isStopped() && deadline > System.currentTimeMillis()) { + int failedOpenCount = 0; + Iterator regionInfoIterator = regionSet.iterator(); + while (regionInfoIterator.hasNext()) { + HRegionInfo hri = regionInfoIterator.next(); + if (regionStates.isRegionOnline(hri) || regionStates.isRegionInState(hri, + State.SPLITTING, State.SPLIT, State.MERGING, State.MERGED)) { + regionInfoIterator.remove(); + } else if (regionStates.isRegionInState(hri, State.FAILED_OPEN)) { + failedOpenCount++; + } + } + if (!waitTillAllAssigned) { + // No need to wait, let assignment going on asynchronously + break; + } + if (!regionSet.isEmpty()) { + if (failedOpenCount == regionSet.size()) { + // all the regions we are waiting had an error on open. + break; + } + regionStates.waitForUpdate(100); + } } - return true; + return regionSet.isEmpty(); } /** @@ -2725,15 +2778,27 @@ public class AssignmentManager extends ZooKeeperListener { LOG.trace("Not using bulk assignment since we are assigning only " + regions + " region(s) to " + servers + " server(s)"); } + + // invoke assignment (async) + ArrayList userRegionSet = new ArrayList(regions); for (Map.Entry> plan: bulkPlan.entrySet()) { if (!assign(plan.getKey(), plan.getValue())) { for (HRegionInfo region: plan.getValue()) { if (!regionStates.isRegionOnline(region)) { invokeAssign(region); + if (!region.getTable().isSystemTable()) { + userRegionSet.add(region); + } } } } } + + // wait for assignment completion + if (!waitForAssignment(userRegionSet, true, userRegionSet.size(), + System.currentTimeMillis())) { + LOG.debug("some user regions are still in transition: " + userRegionSet); + } } else { LOG.info("Bulk assigning " + regions + " region(s) across " + totalServers + " server(s), " + message); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java index 356f4af..176c053 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java @@ -118,31 +118,8 @@ public class GeneralBulkAssigner extends BulkAssigner { if (!failedPlans.isEmpty() && !server.isStopped()) { reassigningRegions = reassignFailedPlans(); } - - Configuration conf = server.getConfiguration(); - long perRegionOpenTimeGuesstimate = - conf.getLong("hbase.bulk.assignment.perregion.open.time", 1000); - long endTime = Math.max(System.currentTimeMillis(), rpcWaitTime) - + perRegionOpenTimeGuesstimate * (reassigningRegions + 1); - RegionStates regionStates = assignmentManager.getRegionStates(); - // We're not synchronizing on regionsInTransition now because we don't use any iterator. - while (!regionSet.isEmpty() && !server.isStopped() && endTime > System.currentTimeMillis()) { - Iterator regionInfoIterator = regionSet.iterator(); - while (regionInfoIterator.hasNext()) { - HRegionInfo hri = regionInfoIterator.next(); - if (regionStates.isRegionOnline(hri) || regionStates.isRegionInState(hri, - State.SPLITTING, State.SPLIT, State.MERGING, State.MERGED)) { - regionInfoIterator.remove(); - } - } - if (!waitTillAllAssigned) { - // No need to wait, let assignment going on asynchronously - break; - } - if (!regionSet.isEmpty()) { - regionStates.waitForUpdate(100); - } - } + assignmentManager.waitForAssignment(regionSet, waitTillAllAssigned, + reassigningRegions, Math.max(System.currentTimeMillis(), rpcWaitTime)); if (LOG.isDebugEnabled()) { long elapsedTime = System.currentTimeMillis() - startTime;