diff --git src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java index 7defac0..249bf4a 100644 --- src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java +++ src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java @@ -242,8 +242,9 @@ public class LocalHBaseCluster { new ArrayList(); List list = getRegionServers(); for (JVMClusterUtil.RegionServerThread rst: list) { - if (rst.isAlive()) liveServers.add(rst); - else LOG.info("Not alive " + rst.getName()); + if (rst.isAlive()){ + liveServers.add(rst); + } } return liveServers; } diff --git src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index 2d47fce..39e6a4e 100644 --- src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -51,7 +51,6 @@ import org.apache.hadoop.hbase.Chore; import org.apache.hadoop.hbase.DeserializationException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HServerLoad; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.RegionTransition; import org.apache.hadoop.hbase.Server; @@ -583,6 +582,7 @@ public class AssignmentManager extends ZooKeeperListener { // So we will assign the ROOT and .META. region immediately. processOpeningState(regionInfo); break; + } regionsInTransition.put(encodedRegionName, getRegionState(regionInfo, RegionState.State.OPENING, rt)); @@ -1847,29 +1847,14 @@ public class AssignmentManager extends ZooKeeperListener { final ServerName serverToExclude, final boolean forceNewPlan) { // Pickup existing plan or make a new one final String encodedName = state.getRegion().getEncodedName(); - final List servers = this.serverManager.getOnlineServersList(); - final List drainingServers = this.serverManager.getDrainingServersList(); - - - if (serverToExclude != null) servers.remove(serverToExclude); + final List destServers = serverManager.createDestinationServersList(serverToExclude); - // Loop through the draining server list and remove them from the server - // list. - if (!drainingServers.isEmpty()) { - for (final ServerName server: drainingServers) { - LOG.debug("Removing draining server: " + server + - " from eligible server pool."); - servers.remove(server); - } + if (destServers.isEmpty()){ + LOG.warn("Can't move the region " + encodedName + + ", there is no destination server available."); + return null; } - // Remove the deadNotExpired servers from the server list. - removeDeadNotExpiredServers(servers); - - - - if (servers.isEmpty()) return null; - RegionPlan randomPlan = null; boolean newPlan = false; RegionPlan existingPlan = null; @@ -1886,10 +1871,10 @@ public class AssignmentManager extends ZooKeeperListener { if (forceNewPlan || existingPlan == null || existingPlan.getDestination() == null - || drainingServers.contains(existingPlan.getDestination())) { + || !destServers.contains(existingPlan.getDestination())) { newPlan = true; randomPlan = new RegionPlan(state.getRegion(), null, - balancer.randomAssignment(state.getRegion(), servers)); + balancer.randomAssignment(state.getRegion(), destServers)); this.regionPlans.put(encodedName, randomPlan); } } @@ -1900,7 +1885,7 @@ public class AssignmentManager extends ZooKeeperListener { " so generated a random one; " + randomPlan + "; " + serverManager.countOfRegionServers() + " (online=" + serverManager.getOnlineServers().size() + - ", available=" + servers.size() + ") available servers"); + ", available=" + destServers.size() + ") available servers"); return randomPlan; } LOG.debug("Using pre-existing plan for region " + @@ -1912,17 +1897,11 @@ public class AssignmentManager extends ZooKeeperListener { * Loop through the deadNotExpired server list and remove them from the * servers. * @param servers + * @deprecated the method is now available in ServerManager - deprecated in 0.96 */ - public void removeDeadNotExpiredServers(List servers) { - Set deadNotExpiredServers = this.serverManager - .getDeadNotExpiredServers(); - if (!deadNotExpiredServers.isEmpty()) { - for (ServerName server : deadNotExpiredServers) { - LOG.debug("Removing dead but not expired server: " + server - + " from eligible server pool."); - servers.remove(server); - } - } + @Deprecated + void removeDeadNotExpiredServers(List servers) { + this.serverManager.removeDeadNotExpiredServers(servers); } /** @@ -2246,9 +2225,8 @@ public class AssignmentManager extends ZooKeeperListener { public void assignUserRegionsToOnlineServers(List regions) throws IOException, InterruptedException { - List servers = this.serverManager.getOnlineServersList(); - removeDeadNotExpiredServers(servers); - assignUserRegions(regions, servers); + List destServers = serverManager.createDestinationServersList(); + assignUserRegions(regions, destServers); } /** @@ -2297,13 +2275,10 @@ public class AssignmentManager extends ZooKeeperListener { */ public void assignAllUserRegions() throws IOException, InterruptedException { // Get all available servers - List servers = serverManager.getOnlineServersList(); - - // Remove the deadNotExpired servers from the server list. - removeDeadNotExpiredServers(servers); + List destServers = serverManager.createDestinationServersList(); // If there are no servers we need not proceed with region assignment. - if(servers.isEmpty()) return; + if(destServers.isEmpty()) return; // Scan META for all user regions, skipping any disabled tables Map allRegions = @@ -2317,17 +2292,17 @@ public class AssignmentManager extends ZooKeeperListener { Map> bulkPlan = null; if (retainAssignment) { // Reuse existing assignment info - bulkPlan = balancer.retainAssignment(allRegions, servers); + bulkPlan = balancer.retainAssignment(allRegions, destServers); } else { // assign regions in round-robin fashion - assignUserRegions(new ArrayList(allRegions.keySet()), servers); + assignUserRegions(new ArrayList(allRegions.keySet()), destServers); for (HRegionInfo hri : allRegions.keySet()) { setEnabledTable(hri); } return; } LOG.info("Bulk assigning " + allRegions.size() + " region(s) across " + - servers.size() + " server(s), retainAssignment=" + retainAssignment); + destServers.size() + " server(s), retainAssignment=" + retainAssignment); // Use fixed count thread pool assigning. BulkAssigner ba = new StartupBulkAssigner(this.master, bulkPlan, this); @@ -2947,8 +2922,7 @@ public class AssignmentManager extends ZooKeeperListener { protected void chore() { // If bulkAssign in progress, suspend checks if (this.bulkAssign) return; - boolean allRSsOffline = this.serverManager.getOnlineServersList(). - isEmpty(); + boolean noRSAvailable = this.serverManager.createDestinationServersList().isEmpty(); synchronized (regionsInTransition) { // Iterate all regions in transition checking for time outs @@ -2957,14 +2931,14 @@ public class AssignmentManager extends ZooKeeperListener { if (regionState.getStamp() + timeout <= now) { //decide on action upon timeout actOnTimeOut(regionState); - } else if (this.allRegionServersOffline && !allRSsOffline) { + } else if (this.allRegionServersOffline && !noRSAvailable) { // if some RSs just came back online, we can start the // the assignment right away actOnTimeOut(regionState); } } } - setAllRegionServersOffline(allRSsOffline); + setAllRegionServersOffline(noRSAvailable); } private void actOnTimeOut(RegionState regionState) { @@ -3283,7 +3257,6 @@ public class AssignmentManager extends ZooKeeperListener { * Run through remaining regionservers and unassign all catalog regions. */ void unassignCatalogRegions() { - this.servers.entrySet(); synchronized (this.regions) { for (Map.Entry> e: this.servers.entrySet()) { Set regions = e.getValue(); diff --git src/main/java/org/apache/hadoop/hbase/master/HMaster.java src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 0ad9b18..95f5128 100644 --- src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -91,7 +91,6 @@ import org.apache.hadoop.hbase.master.handler.TableDeleteFamilyHandler; import org.apache.hadoop.hbase.master.handler.TableEventHandler; import org.apache.hadoop.hbase.master.handler.TableModifyFamilyHandler; import org.apache.hadoop.hbase.master.metrics.MasterMetrics; -import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; @@ -1200,38 +1199,41 @@ Server { if (p == null) throw new UnknownRegionException(Bytes.toStringBinary(encodedRegionName)); HRegionInfo hri = p.getFirst(); - ServerName dest = null; + ServerName dest; if (destServerName == null || destServerName.length == 0) { LOG.info("Passed destination servername is null/empty so " + "choosing a server at random"); - this.assignmentManager.clearRegionPlan(hri); - // Unassign will reassign it elsewhere choosing random server. - this.assignmentManager.unassign(hri); + final List destServers = this.serverManager.createDestinationServersList( + p.getSecond()); + dest = balancer.randomAssignment(hri, destServers); } else { dest = new ServerName(Bytes.toString(destServerName)); if (dest.equals(p.getSecond())) { LOG.debug("Skipping move of region " + hri.getRegionNameAsString() - + " because region already assigned to the same server " + dest +"."); + + " because region already assigned to the same server " + dest + "."); return; } - try { - if (this.cpHost != null) { - if (this.cpHost.preMove(p.getFirst(), p.getSecond(), dest)) { - return; - } - } - RegionPlan rp = new RegionPlan(p.getFirst(), p.getSecond(), dest); - LOG.info("Added move plan " + rp + ", running balancer"); - this.assignmentManager.balance(rp); - if (this.cpHost != null) { - this.cpHost.postMove(p.getFirst(), p.getSecond(), dest); + } + + // Now we can do the move + RegionPlan rp = new RegionPlan(p.getFirst(), p.getSecond(), dest); + + try { + if (this.cpHost != null) { + if (this.cpHost.preMove(hri, rp.getSource(), rp.getDestination())) { + return; } - } catch (IOException ioe) { - UnknownRegionException ure = new UnknownRegionException( - Bytes.toStringBinary(encodedRegionName)); - ure.initCause(ioe); - throw ure; } + LOG.info("Added move plan " + rp + ", running balancer"); + this.assignmentManager.balance(rp); + if (this.cpHost != null) { + this.cpHost.postMove(hri, rp.getSource(), rp.getDestination()); + } + } catch (IOException ioe) { + UnknownRegionException ure = new UnknownRegionException( + Bytes.toStringBinary(encodedRegionName)); + ure.initCause(ioe); + throw ure; } } diff --git src/main/java/org/apache/hadoop/hbase/master/ServerManager.java src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index 29e1066..13eb9d4 100644 --- src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -205,8 +205,8 @@ public class ServerManager { } /** - * Checks if the clock skew between the server and the master. If the clock skew exceeds the - * configured max, it will throw an exception; if it exceeds the configured warning threshold, + * Checks if the clock skew between the server and the master. If the clock skew exceeds the + * configured max, it will throw an exception; if it exceeds the configured warning threshold, * it will log a warning but start normally. * @param serverName Incoming servers's name * @param serverCurrentTime @@ -223,7 +223,7 @@ public class ServerManager { throw new ClockOutOfSyncException(message); } else if (skew > warningSkew){ String message = "Reported time for server " + serverName + " is out of sync with master " + - "by " + skew + "ms. (Warning threshold is " + warningSkew + "ms; " + + "by " + skew + "ms. (Warning threshold is " + warningSkew + "ms; " + "error threshold is " + maxSkew + "ms)"; LOG.warn(message); } @@ -687,4 +687,55 @@ public class ServerManager { } } } + + /** + * Creates a list of possible destinations for a region. It contains the online servers, but not + * the draining or dying servers. + * @param serverToExclude can be null if there is no server to exclude + */ + public List createDestinationServersList(final ServerName serverToExclude){ + final List destServers = getOnlineServersList(); + + if (serverToExclude != null){ + destServers.remove(serverToExclude); + } + + // Loop through the draining server list and remove them from the server list + final List drainingServersCopy = getDrainingServersList(); + if (!drainingServersCopy.isEmpty()) { + for (final ServerName server: drainingServersCopy) { + destServers.remove(server); + } + } + + // Remove the deadNotExpired servers from the server list. + removeDeadNotExpiredServers(destServers); + + return destServers; + } + + /** + * Calls {@link #createDestinationServersList} without server to exclude. + */ + public List createDestinationServersList(){ + return createDestinationServersList(null); + } + + /** + * Loop through the deadNotExpired server list and remove them from the + * servers. + * This function should be used carefully outside of this class. You should use a high level + * method such as {@link #createDestinationServersList()} instead of managing you own list. + */ + void removeDeadNotExpiredServers(List servers) { + Set deadNotExpiredServersCopy = this.getDeadNotExpiredServers(); + if (!deadNotExpiredServersCopy.isEmpty()) { + for (ServerName server : deadNotExpiredServersCopy) { + LOG.debug("Removing dead but not expired server: " + server + + " from eligible server pool."); + servers.remove(server); + } + } + } + } diff --git src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java index 2bff695..e3a2326 100644 --- src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java +++ src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java @@ -181,9 +181,7 @@ public class CreateTableHandler extends EventHandler { } // 4. Trigger immediate assignment of the regions in round-robin fashion - List servers = serverManager.getOnlineServersList(); - // Remove the deadNotExpired servers from the server list. - assignmentManager.removeDeadNotExpiredServers(servers); + List servers = serverManager.createDestinationServersList(); try { this.assignmentManager.assignUserRegions(Arrays.asList(newRegions), servers); diff --git src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java index a6616cc..33cfb09 100644 --- src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java +++ src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java @@ -311,7 +311,7 @@ public class ServerShutdownHandler extends EventHandler { } // Get all available servers List availableServers = services.getServerManager() - .getOnlineServersList(); + .createDestinationServersList(); this.services.getAssignmentManager().assign(toAssignRegions, availableServers); } diff --git src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index f7ac81a..9d3898c 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -3426,7 +3426,7 @@ public class HRegionServer implements ClientProtocol, } } LOG.info("Received request to open region: " - + region.getRegionNameAsString()); + + region.getRegionNameAsString() + " on "+this.serverNameFromMasterPOV); HTableDescriptor htd = this.tableDescriptors.get(region.getTableName()); this.regionsInTransitionInRS.putIfAbsent(region.getEncodedNameAsBytes(), true); // Need to pass the expected version in the constructor. diff --git src/test/java/org/apache/hadoop/hbase/TestDrainingServer.java src/test/java/org/apache/hadoop/hbase/TestDrainingServer.java index e5de603..b526c01 100644 --- src/test/java/org/apache/hadoop/hbase/TestDrainingServer.java +++ src/test/java/org/apache/hadoop/hbase/TestDrainingServer.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hbase; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; import java.util.List; import junit.framework.Assert; @@ -27,13 +29,15 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.ServerManager; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSTableDescriptors; -import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.ZKAssign; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -55,13 +59,14 @@ public class TestDrainingServer { private static final byte [] TABLENAME = Bytes.toBytes("t"); private static final byte [] FAMILY = Bytes.toBytes("f"); private static final int COUNT_OF_REGIONS = HBaseTestingUtility.KEYS.length; + private static final int NB_SLAVES = 5; /** * Spin up a cluster with a bunch of regions on it. */ @BeforeClass public static void setUpBeforeClass() throws Exception { - TEST_UTIL.startMiniCluster(5); + TEST_UTIL.startMiniCluster(NB_SLAVES); TEST_UTIL.getConfiguration().setBoolean("hbase.master.enabletable.roundrobin", true); ZooKeeperWatcher zkw = HBaseTestingUtility.getZooKeeperWatcher(TEST_UTIL); HTableDescriptor htd = new HTableDescriptor(TABLENAME); @@ -74,14 +79,28 @@ public class TestDrainingServer { createTableDescriptor(fs, FSUtils.getRootDir(TEST_UTIL.getConfiguration()), htd); // Assign out the regions we just created. HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); + MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster(); admin.disableTable(TABLENAME); admin.enableTable(TABLENAME); - ZKAssign.blockUntilNoRIT(zkw); - // Assert that every regionserver has some regions on it. - MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster(); - for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) { - HRegionServer hrs = cluster.getRegionServer(i); - Assert.assertFalse(ProtobufUtil.getOnlineRegions(hrs).isEmpty()); + + boolean ready = false; + while (!ready){ + ZKAssign.blockUntilNoRIT(zkw); + + // Assert that every regionserver has some regions on it. + int i = 0; + ready = true; + while (i< cluster.getRegionServerThreads().size() && ready){ + HRegionServer hrs = cluster.getRegionServer(i); + if (ProtobufUtil.getOnlineRegions(hrs).isEmpty()){ + ready = false; + } + } + + if (!ready){ + admin.balancer(); + Thread.sleep(100); + } } } @@ -159,42 +178,96 @@ public class TestDrainingServer { * @throws IOException */ @Test (timeout=30000) - public void testDrainingServerWithAbort() throws KeeperException, IOException { - // Add first server to draining servers up in zk. - HRegionServer drainingServer = - setDrainingServer(TEST_UTIL.getMiniHBaseCluster().getRegionServer(0)); + public void testDrainingServerWithAbort() throws KeeperException, Exception { + + // Ensure a stable env + TEST_UTIL.getHBaseAdmin().balanceSwitch(false); + waitForAllRegionsOnline(); + + final long regionCount = TEST_UTIL.getMiniHBaseCluster().countServedRegions(); + + // Let's get a copy of the regions today. + Collection regions = new ArrayList(); + for (int i = 0; i < NB_SLAVES; i++) { + HRegionServer hrs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(i); + regions.addAll( hrs.getCopyOfOnlineRegionsSortedBySize().values() ); + } + + // Choose the draining server + HRegionServer drainingServer = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0); + final int regionsOnDrainingServer = drainingServer.getNumberOfOnlineRegions(); + Assert.assertTrue(regionsOnDrainingServer > 0); + + ServerManager sm = TEST_UTIL.getHBaseCluster().getMaster().getServerManager(); + + Collection regionsBefore = drainingServer. + getCopyOfOnlineRegionsSortedBySize().values(); + LOG.info("Regions of drained server are: "+ regionsBefore ); + try { - final int regionsOnDrainingServer = - drainingServer.getNumberOfOnlineRegions(); - Assert.assertTrue(regionsOnDrainingServer > 0); + // Add first server to draining servers up in zk. + setDrainingServer(drainingServer); + + //wait for the master to receive and manage the event + while (sm.createDestinationServersList().contains(drainingServer.getServerName())) ; + + LOG.info("The available servers are: "+ sm.createDestinationServersList()); + + Assert.assertEquals("Nothing should have happened here.", regionsOnDrainingServer, + drainingServer.getNumberOfOnlineRegions()); + Assert.assertTrue("We should not have regions in transition here.", + TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager(). + getRegionsInTransition().isEmpty() ); + // Kill a few regionservers. - int aborted = 0; - final int numberToAbort = 2; - for (int i = 1; i < TEST_UTIL.getMiniHBaseCluster().countServedRegions(); i++) { - HRegionServer hrs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(i); - if (hrs.getServerName().equals(drainingServer.getServerName())) continue; + for (int aborted = 0; aborted <= 2; aborted++) { + HRegionServer hrs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(aborted+1); hrs.abort("Aborting"); - aborted++; - if (aborted >= numberToAbort) break; } - // Wait for regions to come back on line again. + + // Wait for regions to come back online again. waitForAllRegionsOnline(); - // Assert the draining server still has the same number of regions. - Assert.assertEquals(regionsOnDrainingServer, - drainingServer.getNumberOfOnlineRegions()); + + Collection regionsAfter = + drainingServer.getCopyOfOnlineRegionsSortedBySize().values(); + LOG.info("Regions of drained server are: "+ regionsAfter ); + + Assert.assertEquals("Test conditions are not met: regions were" + + " created/deleted during the test. ", + regionCount, TEST_UTIL.getMiniHBaseCluster().countServedRegions()); + + // Assert the draining server still has the same regions. + StringBuilder result = new StringBuilder(); + for (HRegion r: regionsAfter){ + if (!regionsBefore.contains(r)){ + result.append(r).append(" was added after the drain"); + if (regions.contains(r)){ + result.append("(existing region"); + } else { + result.append("(new region)"); + } + result.append("; "); + } + } + for (HRegion r: regionsBefore){ + if (!regionsAfter.contains(r)){ + result.append(r).append(" was removed after the drain; "); + } + } + Assert.assertTrue("Errors are: "+ result.toString(), result.length()==0); + } finally { unsetDrainingServer(drainingServer); } } private void waitForAllRegionsOnline() { - while (TEST_UTIL.getMiniHBaseCluster().getMaster(). - getAssignmentManager().isRegionsInTransition()) { - Threads.sleep(10); - } // Wait for regions to come back on line again. while (!isAllRegionsOnline()) { - Threads.sleep(10); + } + + while (TEST_UTIL.getMiniHBaseCluster().getMaster(). + getAssignmentManager().isRegionsInTransition()) { } }