Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (revision 1551457) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (working copy) @@ -2505,8 +2505,7 @@ + totalServers + " server(s), " + message); // Use fixed count thread pool assigning. - BulkAssigner ba = new GeneralBulkAssigner( - this.server, bulkPlan, this, bulkAssignWaitTillAllAssigned); + BulkAssigner ba = new BulkEnabler(this.server, bulkPlan, this, bulkAssignWaitTillAllAssigned); ba.bulkAssign(); LOG.info("Bulk assigning done"); } @@ -3488,4 +3487,11 @@ // remove the region plan as well just in case. clearRegionPlan(regionInfo); } + + /** + * @return Instance of load balancer + */ + public LoadBalancer getBalancer() { + return this.balancer; + } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java (revision 1551457) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java (working copy) @@ -559,7 +559,9 @@ } else { // multiple new servers in the cluster on this same host int size = localServers.size(); - ServerName target = localServers.get(RANDOM.nextInt(size)); + ServerName target = + localServers.contains(oldServerName) ? oldServerName : localServers.get(RANDOM + .nextInt(size)); assignments.get(target).add(region); numRetainedAssigments++; } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/BulkEnabler.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/BulkEnabler.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/BulkEnabler.java (working copy) @@ -0,0 +1,239 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.master.RegionState.State; + +/** + * Run bulk assign. Does one RCP per regionserver passing a + * batch of regions using {@link BulkEnabler.SingleServerBulkAssigner}. + */ +@InterfaceAudience.Private +public class BulkEnabler extends BulkAssigner { + private static final Log LOG = LogFactory.getLog(BulkEnabler.class); + + private Map> failedPlans + = new ConcurrentHashMap>(); + private ExecutorService pool; + + final Map> bulkPlan; + final AssignmentManager assignmentManager; + final boolean waitTillAllAssigned; + + public BulkEnabler(final Server server, + final Map> bulkPlan, + final AssignmentManager am, final boolean waitTillAllAssigned) { + super(server); + this.bulkPlan = bulkPlan; + this.assignmentManager = am; + this.waitTillAllAssigned = waitTillAllAssigned; + } + + @Override + protected String getThreadNamePrefix() { + return this.server.getServerName() + "-GeneralBulkAssigner"; + } + + @Override + protected void populatePool(ExecutorService pool) { + this.pool = pool; // shut it down later in case some assigner hangs + for (Map.Entry> e: this.bulkPlan.entrySet()) { + pool.execute(new SingleServerBulkAssigner(e.getKey(), e.getValue(), + this.assignmentManager, this.failedPlans)); + } + } + + /** + * + * @param timeout How long to wait. + * @return true if done. + */ + @Override + protected boolean waitUntilDone(final long timeout) + throws InterruptedException { + Set regionSet = new HashSet(); + for (List regionList : bulkPlan.values()) { + regionSet.addAll(regionList); + } + + pool.shutdown(); // no more task allowed + int serverCount = bulkPlan.size(); + int regionCount = regionSet.size(); + long startTime = System.currentTimeMillis(); + long rpcWaitTime = startTime + timeout; + while (!server.isStopped() && !pool.isTerminated() + && rpcWaitTime > System.currentTimeMillis()) { + if (failedPlans.isEmpty()) { + pool.awaitTermination(100, TimeUnit.MILLISECONDS); + } else { + reassignFailedPlans(); + } + } + if (!pool.isTerminated()) { + LOG.warn("bulk assigner is still running after " + + (System.currentTimeMillis() - startTime) + "ms, shut it down now"); + // some assigner hangs, can't wait any more, shutdown the pool now + List notStarted = pool.shutdownNow(); + if (notStarted != null && !notStarted.isEmpty()) { + server.abort("some single server assigner hasn't started yet" + + " when the bulk assigner timed out", null); + return false; + } + } + + int reassigningRegions = 0; + if (!failedPlans.isEmpty() && !server.isStopped()) { + reassigningRegions = reassignFailedPlans(); + } + + Configuration conf = server.getConfiguration(); + long perRegionOpenTimeGuesstimate = + conf.getLong("hbase.bulk.assignment.perregion.open.time", 1000); + long endTime = Math.max(System.currentTimeMillis(), rpcWaitTime) + + perRegionOpenTimeGuesstimate * (reassigningRegions + 1); + RegionStates regionStates = assignmentManager.getRegionStates(); + // We're not synchronizing on regionsInTransition now because we don't use any iterator. + while (!regionSet.isEmpty() && !server.isStopped() && endTime > System.currentTimeMillis()) { + Iterator regionInfoIterator = regionSet.iterator(); + while (regionInfoIterator.hasNext()) { + HRegionInfo hri = regionInfoIterator.next(); + if (regionStates.isRegionOnline(hri) || regionStates.isRegionInState(hri, + State.SPLITTING, State.SPLIT, State.MERGING, State.MERGED)) { + regionInfoIterator.remove(); + } + } + if (!waitTillAllAssigned) { + // No need to wait, let assignment going on asynchronously + break; + } + if (!regionSet.isEmpty()) { + regionStates.waitForUpdate(100); + } + } + + if (LOG.isDebugEnabled()) { + long elapsedTime = System.currentTimeMillis() - startTime; + String status = "successfully"; + if (!regionSet.isEmpty()) { + status = "with " + regionSet.size() + " regions still in transition"; + } + LOG.debug("bulk assigning total " + regionCount + " regions to " + + serverCount + " servers, took " + elapsedTime + "ms, " + status); + } + return regionSet.isEmpty(); + } + + @Override + protected long getTimeoutOnRIT() { + // Guess timeout. Multiply the max number of regions on a server + // by how long we think one region takes opening. + Configuration conf = server.getConfiguration(); + long perRegionOpenTimeGuesstimate = + conf.getLong("hbase.bulk.assignment.perregion.open.time", 1000); + int maxRegionsPerServer = 1; + for (List regionList : bulkPlan.values()) { + int size = regionList.size(); + if (size > maxRegionsPerServer) { + maxRegionsPerServer = size; + } + } + long timeout = perRegionOpenTimeGuesstimate * maxRegionsPerServer + + conf.getLong("hbase.regionserver.rpc.startup.waittime", 60000) + + conf.getLong("hbase.bulk.assignment.perregionserver.rpc.waittime", + 30000) * bulkPlan.size(); + LOG.debug("Timeout-on-RIT=" + timeout); + return timeout; + } + + @Override + protected UncaughtExceptionHandler getUncaughtExceptionHandler() { + return new UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + LOG.warn("Assigning regions in " + t.getName(), e); + } + }; + } + + private int reassignFailedPlans() { + List reassigningRegions = new ArrayList(); + for (Map.Entry> e : failedPlans.entrySet()) { + LOG.info("Failed assigning " + e.getValue().size() + + " regions to server " + e.getKey() + ", reassigning them"); + reassigningRegions.addAll(failedPlans.remove(e.getKey())); + } + RegionStates regionStates = assignmentManager.getRegionStates(); + for (HRegionInfo region : reassigningRegions) { + if (!regionStates.isRegionOnline(region)) { + assignmentManager.invokeAssign(region); + } + } + return reassigningRegions.size(); + } + + /** + * Manage bulk assigning to a server. + */ + static class SingleServerBulkAssigner implements Runnable { + private final ServerName regionserver; + private final List regions; + private final AssignmentManager assignmentManager; + private final Map> failedPlans; + + SingleServerBulkAssigner(final ServerName regionserver, + final List regions, final AssignmentManager am, + final Map> failedPlans) { + this.regionserver = regionserver; + this.regions = regions; + this.assignmentManager = am; + this.failedPlans = failedPlans; + } + + @Override + public void run() { + try { + if (!assignmentManager.assign(regionserver, regions)) { + failedPlans.put(regionserver, regions); + } + } catch (Throwable t) { + LOG.warn("Failed bulking assigning " + regions.size() + + " region(s) to " + regionserver.getServerName() + + ", and continue to bulk assign others", t); + failedPlans.put(regionserver, regions); + } + } + } +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java (revision 1551457) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java (working copy) @@ -19,8 +19,9 @@ package org.apache.hadoop.hbase.master.handler; import java.io.IOException; -import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutorService; import org.apache.commons.logging.Log; @@ -38,9 +39,9 @@ import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.master.BulkAssigner; +import org.apache.hadoop.hbase.master.BulkEnabler; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; -import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.master.RegionStates; import org.apache.hadoop.hbase.master.ServerManager; import org.apache.hadoop.hbase.master.TableLockManager; @@ -173,22 +174,29 @@ // Set table enabling flag up in zk. this.assignmentManager.getZKTable().setEnablingTable(this.tableName); boolean done = false; + ServerManager serverManager = ((HMaster)this.server).getServerManager(); // Get the regions of this table. We're done when all listed // tables are onlined. List> tableRegionsAndLocations = MetaReader .getTableRegionsAndLocations(this.catalogTracker, tableName, true); int countOfRegionsInTable = tableRegionsAndLocations.size(); - List regions = regionsToAssignWithServerName(tableRegionsAndLocations); - int regionsCount = regions.size(); + Map regionsToAssign = + regionsToAssignWithServerName(tableRegionsAndLocations); + int regionsCount = regionsToAssign.size(); if (regionsCount == 0) { done = true; } LOG.info("Table '" + this.tableName + "' has " + countOfRegionsInTable + " regions, of which " + regionsCount + " are offline."); - BulkEnabler bd = new BulkEnabler(this.server, regions, countOfRegionsInTable, - true); + List onlineServers = serverManager.createDestinationServersList(); + Map> bulkPlan = + this.assignmentManager.getBalancer().retainAssignment(regionsToAssign, onlineServers); + LOG.info("Bulk assigning " + regionsCount + " region(s) across " + bulkPlan.size() + + " server(s), retainAssignment=true"); + + BulkAssigner ba = new BulkEnabler(this.server, bulkPlan, this.assignmentManager, true); try { - if (bd.bulkAssign()) { + if (ba.bulkAssign()) { done = true; } } catch (InterruptedException e) { @@ -214,19 +222,16 @@ * @return List of regions neither in transition nor assigned. * @throws IOException */ - private List regionsToAssignWithServerName( + private Map regionsToAssignWithServerName( final List> regionsInMeta) throws IOException { - ServerManager serverManager = ((HMaster) this.server).getServerManager(); - List regions = new ArrayList(); + Map regionsToAssign = + new HashMap(regionsInMeta.size()); RegionStates regionStates = this.assignmentManager.getRegionStates(); for (Pair regionLocation : regionsInMeta) { HRegionInfo hri = regionLocation.getFirst(); ServerName sn = regionLocation.getSecond(); if (regionStates.isRegionOffline(hri)) { - if (sn != null && serverManager.isServerOnline(sn)) { - this.assignmentManager.addPlan(hri.getEncodedName(), new RegionPlan(hri, null, sn)); - } - regions.add(hri); + regionsToAssign.put(hri, sn); } else { if (LOG.isDebugEnabled()) { LOG.debug("Skipping assign for the region " + hri + " during enable table " @@ -234,67 +239,6 @@ } } } - return regions; + return regionsToAssign; } - - /** - * Run bulk enable. - */ - class BulkEnabler extends BulkAssigner { - private final List regions; - // Count of regions in table at time this assign was launched. - private final int countOfRegionsInTable; - - BulkEnabler(final Server server, final List regions, - final int countOfRegionsInTable, boolean retainAssignment) { - super(server); - this.regions = regions; - this.countOfRegionsInTable = countOfRegionsInTable; - } - - @Override - protected void populatePool(ExecutorService pool) throws IOException { - // In case of masterRestart always go with single assign. Going thro - // roundRobinAssignment will use bulkassign which may lead to double assignment. - for (HRegionInfo region : regions) { - if (assignmentManager.getRegionStates() - .isRegionInTransition(region)) { - continue; - } - final HRegionInfo hri = region; - pool.execute(Trace.wrap("BulkEnabler.populatePool",new Runnable() { - public void run() { - assignmentManager.assign(hri, true); - } - })); - } - } - - @Override - protected boolean waitUntilDone(long timeout) - throws InterruptedException { - long startTime = System.currentTimeMillis(); - long remaining = timeout; - List regions = null; - int lastNumberOfRegions = 0; - while (!server.isStopped() && remaining > 0) { - Thread.sleep(waitingTimeForEvents); - regions = assignmentManager.getRegionStates() - .getRegionsOfTable(tableName); - if (isDone(regions)) break; - - // Punt on the timeout as long we make progress - if (regions.size() > lastNumberOfRegions) { - lastNumberOfRegions = regions.size(); - timeout += waitingTimeForEvents; - } - remaining = timeout - (System.currentTimeMillis() - startTime); - } - return isDone(regions); - } - - private boolean isDone(final List regions) { - return regions != null && regions.size() >= this.countOfRegionsInTable; - } - } } Index: hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java (revision 1551457) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java (working copy) @@ -880,7 +880,7 @@ for (Map.Entry entry : regions.entrySet()) { assertEquals(regions2.get(entry.getKey()), entry.getValue()); } - } + } /** * Multi-family scenario. Tests forcing split from client and Index: hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java (revision 1551457) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java (working copy) @@ -1159,6 +1159,18 @@ } @Override + boolean assign(ServerName destination, List regions) { + if (enabling) { + for (HRegionInfo region : regions) { + assignmentCount++; + this.regionOnline(region, SERVERNAME_A); + } + return true; + } + return super.assign(destination, regions); + } + + @Override public void assign(List regions) throws IOException, InterruptedException { assignInvoked = (regions != null && regions.size() > 0);