diff --git a/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java b/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java index 901be8b..6ecfeb4 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java +++ b/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java @@ -58,7 +58,7 @@ import org.apache.hadoop.hbase.HServerInfo; */ public class LoadBalancer { private static final Log LOG = LogFactory.getLog(LoadBalancer.class); - private static final Random rand = new Random(); + private static final Random RANDOM = new Random(System.currentTimeMillis()); static class RegionPlanComparator implements Comparator { @Override @@ -195,7 +195,7 @@ public class LoadBalancer { break; } serversOverloaded++; - List regions = server.getValue(); + List regions = randomize(server.getValue()); int numToOffload = Math.min(regionCount - max, regions.size()); int numTaken = 0; for (int i = regions.size() - 1; i >= 0; i--) { @@ -209,8 +209,6 @@ public class LoadBalancer { serverBalanceInfo.put(serverInfo, new BalanceInfo(numToOffload, (-1)*numTaken)); } - // put young regions at the beginning of regionsToMove - Collections.sort(regionsToMove, rpComparator); // Walk down least loaded, filling each to the min int serversUnderloaded = 0; // number of servers that get new regions @@ -336,6 +334,15 @@ public class LoadBalancer { } /** + * @param regions + * @return Randomization of passed regions + */ + static List randomize(final List regions) { + Collections.shuffle(regions, RANDOM); + return regions; + } + + /** * Stores additional per-server information about the regions added/removed * during the run of the balancing algorithm. * @@ -396,7 +403,7 @@ public class LoadBalancer { int max = (int)Math.ceil((float)numRegions/numServers); int serverIdx = 0; if (numServers > 1) { - serverIdx = rand.nextInt(numServers); + serverIdx = RANDOM.nextInt(numServers); } int regionIdx = 0; for (int j = 0; j < numServers; j++) { @@ -444,7 +451,7 @@ public class LoadBalancer { if (server != null) { assignments.get(server).add(region.getKey()); } else { - assignments.get(servers.get(rand.nextInt(assignments.size()))).add( + assignments.get(servers.get(RANDOM.nextInt(assignments.size()))).add( region.getKey()); } } @@ -575,7 +582,7 @@ public class LoadBalancer { Map assignments = new TreeMap(); for(HRegionInfo region : regions) { - assignments.put(region, servers.get(rand.nextInt(servers.size()))); + assignments.put(region, servers.get(RANDOM.nextInt(servers.size()))); } return assignments; } @@ -585,7 +592,7 @@ public class LoadBalancer { LOG.warn("Wanted to do random assignment but no servers to assign to"); return null; } - return servers.get(rand.nextInt(servers.size())); + return servers.get(RANDOM.nextInt(servers.size())); } /** diff --git a/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java index 48cbaad..224ecca 100644 --- a/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java +++ b/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java @@ -43,6 +43,8 @@ import java.io.DataInputStream; import java.io.EOFException; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InterruptedIOException; +import java.lang.reflect.InvocationTargetException; import java.net.URI; import java.net.URISyntaxException; import java.util.HashMap; @@ -631,8 +633,22 @@ public class FSUtils { boolean recovered = false; while (!recovered) { try { - FSDataOutputStream out = fs.append(p); - out.close(); + try { + if (fs instanceof DistributedFileSystem) { + DistributedFileSystem dfs = (DistributedFileSystem)fs; + DistributedFileSystem.class.getMethod("recoverLease", + new Class[] {Path.class}).invoke(dfs, p); + } else { + throw new Exception("Not a DistributedFileSystem"); + } + } catch (InvocationTargetException ite) { + // function was properly called, but threw it's own exception + throw (IOException) ite.getCause(); + } catch (Exception e) { + LOG.debug("Could not call recoverLease, trying append: ", e); + FSDataOutputStream out = fs.append(p); + out.close(); + } recovered = true; } catch (IOException e) { e = RemoteExceptionHandler.checkIOException(e); @@ -646,11 +662,6 @@ public class FSUtils { LOG.warn("Waited " + waitedFor + "ms for lease recovery on " + p + ":" + e.getMessage()); } - try { - Thread.sleep(1000); - } catch (InterruptedException ex) { - // ignore it and try again - } } else if (e instanceof LeaseExpiredException && e.getMessage().contains("File does not exist")) { // This exception comes out instead of FNFE, fix it @@ -660,8 +671,12 @@ public class FSUtils { throw new IOException("Failed to open " + p + " for append", e); } } + try { + Thread.sleep(1000); + } catch (InterruptedException ex) { + new InterruptedIOException().initCause(ex); + } } LOG.info("Finished lease recover attempt for " + p); } - -} +} \ No newline at end of file diff --git a/src/test/java/org/apache/hadoop/hbase/master/TestLoadBalancer.java b/src/test/java/org/apache/hadoop/hbase/master/TestLoadBalancer.java index ca2a4bc..9568b14 100644 --- a/src/test/java/org/apache/hadoop/hbase/master/TestLoadBalancer.java +++ b/src/test/java/org/apache/hadoop/hbase/master/TestLoadBalancer.java @@ -19,6 +19,7 @@ */ package org.apache.hadoop.hbase.master; +import static org.junit.Assert.*; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -34,6 +35,8 @@ import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; +import junit.framework.Assert; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HRegionInfo; @@ -42,6 +45,7 @@ import org.apache.hadoop.hbase.HServerInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.master.LoadBalancer.RegionPlan; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.tools.ant.taskdefs.PathConvert.MapEntry; import org.junit.BeforeClass; import org.junit.Test; @@ -134,6 +138,38 @@ public class TestLoadBalancer { new int [] { 12, 100 }, }; + @Test + public void testRandomizer() { + for(int [] mockCluster : clusterStateMocks) { + if (mockCluster.length < 5) continue; + Map> servers = + mockClusterServers(mockCluster); + for (Map.Entry> e: servers.entrySet()) { + List original = e.getValue(); + if (original.size() < 5) continue; + // Try ten times in case random chances upon original order more than + // one or two times in a row. + boolean same = true; + for (int i = 0; i < 10 && same; i++) { + List copy = new ArrayList(original); + System.out.println("Randomizing before " + copy.size()); + for (HRegionInfo hri: copy) { + System.out.println(hri.getEncodedName()); + } + List randomized = LoadBalancer.randomize(copy); + System.out.println("Randomizing after " + randomized.size()); + for (HRegionInfo hri: randomized) { + System.out.println(hri.getEncodedName()); + } + if (original.equals(randomized)) continue; + same = false; + break; + } + assertFalse(same); + } + } + } + /** * Test the load balancing algorithm. * @@ -410,7 +446,7 @@ public class TestLoadBalancer { Bytes.putInt(start, 0, numRegions << 1); Bytes.putInt(end, 0, (numRegions << 1) + 1); HRegionInfo hri = new HRegionInfo( - new HTableDescriptor(Bytes.toBytes("table")), start, end); + new HTableDescriptor(Bytes.toBytes("table" + i)), start, end); regions.add(hri); } return regions;