From 75043178bea4e18460bcca332b4765f60e371a09 Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Thu, 11 Oct 2018 15:28:36 -0700 Subject: [PATCH] HBASE-21266 Not running balancer because processing dead regionservers, but empty dead rs list --- .../hadoop/hbase/master/DeadServer.java | 95 ++++++++++++++----- .../hadoop/hbase/master/TestDeadServer.java | 4 +- .../TestEndToEndSplitTransaction.java | 6 +- .../TestSplitTransactionOnCluster.java | 23 +++-- 4 files changed, 90 insertions(+), 38 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/DeadServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/DeadServer.java index c1b5180cb6..e7846b7b7d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/DeadServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/DeadServer.java @@ -18,12 +18,7 @@ */ package org.apache.hadoop.hbase.master; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.Pair; +import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.Collections; @@ -36,6 +31,13 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Pair; + /** * Class to hold dead servers list and utility querying dead server list. * On znode expiration, servers are added here. @@ -54,14 +56,9 @@ public class DeadServer { private final Map deadServers = new HashMap(); /** - * Number of dead servers currently being processed + * Set of dead servers currently being processed */ - private int numProcessing = 0; - - /** - * Whether a dead server is being processed currently. - */ - private boolean processing = false; + private final Set processingServers = new HashSet(); /** * A dead server that comes back alive has a different start code. The new start code should be @@ -76,7 +73,13 @@ public class DeadServer { while (it.hasNext()) { ServerName sn = it.next(); if (ServerName.isSameHostnameAndPort(sn, newServerName)) { + // remove from deadServers it.remove(); + // remove from processingServers + boolean removed = processingServers.remove(sn); + if (removed) { + LOG.debug("Removed " + sn + " ; numProcessing=" + processingServers.size()); + } return true; } } @@ -92,6 +95,14 @@ public class DeadServer { return deadServers.containsKey(serverName); } + /** + * @param serverName server name. + * @return true if this server is on the processing servers list false otherwise + */ + public synchronized boolean isProcessingServer(final ServerName serverName) { + return processingServers.contains(serverName); + } + /** * Checks if there are currently any dead servers being processed by the * master. Returns true if at least one region server is currently being @@ -99,7 +110,9 @@ public class DeadServer { * * @return true if any RS are being processed as dead */ - public synchronized boolean areDeadServersInProgress() { return processing; } + public synchronized boolean areDeadServersInProgress() { + return !processingServers.isEmpty(); + } public synchronized Set copyServerNames() { Set clone = new HashSet(deadServers.size()); @@ -112,10 +125,13 @@ public class DeadServer { * @param sn the server name */ public synchronized void add(ServerName sn) { - processing = true; if (!deadServers.containsKey(sn)){ deadServers.put(sn, EnvironmentEdgeManager.currentTime()); } + boolean added = processingServers.add(sn); + if (LOG.isDebugEnabled() && added) { + LOG.debug("Added " + sn + "; numProcessing=" + processingServers.size()); + } } /** @@ -123,18 +139,27 @@ public class DeadServer { * @param sn ServerName for the dead server. */ public synchronized void notifyServer(ServerName sn) { - if (LOG.isDebugEnabled()) { LOG.debug("Started processing " + sn); } - processing = true; - numProcessing++; + boolean added = processingServers.add(sn); + if (LOG.isDebugEnabled()) { + if (added) { + LOG.debug("Added " + sn + "; numProcessing=" + processingServers.size()); + } + LOG.debug("Started processing " + sn + "; numProcessing=" + processingServers.size()); + } } + /** + * Complete processing for this dead server. + * @param sn ServerName for the dead server. + */ public synchronized void finish(ServerName sn) { - numProcessing--; - if (LOG.isDebugEnabled()) LOG.debug("Finished " + sn + "; numProcessing=" + numProcessing); - - assert numProcessing >= 0: "Number of dead servers in processing should always be non-negative"; - - if (numProcessing == 0) { processing = false; } + boolean removed = processingServers.remove(sn); + if (LOG.isDebugEnabled()) { + LOG.debug("Finished processing " + sn + "; numProcessing=" + processingServers.size()); + if (removed) { + LOG.debug("Removed " + sn + " ; numProcessing=" + processingServers.size()); + } + } } public synchronized int size() { @@ -150,20 +175,37 @@ public class DeadServer { while (it.hasNext()) { ServerName sn = it.next(); if (ServerName.isSameHostnameAndPort(sn, newServerName)) { + // remove from deadServers it.remove(); + // remove from processingServers + boolean removed = processingServers.remove(sn); + if (removed) { + LOG.debug("Removed " + sn + " ; numProcessing=" + processingServers.size()); + } } } } @Override public synchronized String toString() { + // Display unified set of servers from both maps + Set servers = new HashSet(); + servers.addAll(deadServers.keySet()); + servers.addAll(processingServers); StringBuilder sb = new StringBuilder(); - for (ServerName sn : deadServers.keySet()) { + for (ServerName sn : servers) { if (sb.length() > 0) { sb.append(", "); } sb.append(sn.toString()); + // Star entries that are being processed + if (processingServers.contains(sn)) { + sb.append("*"); + } } + sb.append(" (numProcessing="); + sb.append(processingServers.size()); + sb.append(')'); return sb.toString(); } @@ -210,6 +252,9 @@ public class DeadServer { * @return true if this server was removed */ public synchronized boolean removeDeadServer(final ServerName deadServerName) { + Preconditions.checkState(!processingServers.contains(deadServerName), + "Asked to remove server still in processingServers set " + deadServerName + + " (numProcessing=" + processingServers.size() + ")"); if (deadServers.remove(deadServerName) == null) { return false; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDeadServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDeadServer.java index f1a01c5301..8876e705be 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDeadServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDeadServer.java @@ -116,7 +116,6 @@ public class TestDeadServer { DeadServer d = new DeadServer(); - d.add(hostname123); mee.incValue(1); d.add(hostname1234); @@ -157,14 +156,17 @@ public class TestDeadServer { d.add(hostname1234); Assert.assertEquals(2, d.size()); + d.finish(hostname123); d.removeDeadServer(hostname123); Assert.assertEquals(1, d.size()); + d.finish(hostname1234); d.removeDeadServer(hostname1234); Assert.assertTrue(d.isEmpty()); d.add(hostname1234); Assert.assertFalse(d.removeDeadServer(hostname123_2)); Assert.assertEquals(1, d.size()); + d.finish(hostname1234); Assert.assertTrue(d.removeDeadServer(hostname1234)); Assert.assertTrue(d.isEmpty()); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java index 86662a4dbc..1892811ee0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java @@ -459,7 +459,7 @@ public class TestEndToEndSplitTransaction { Throwable ex; RegionChecker(Configuration conf, Stoppable stopper, TableName tableName) throws IOException { - super("RegionChecker", stopper, 10); + super("RegionChecker", stopper, 100); this.conf = conf; this.tableName = tableName; @@ -669,7 +669,7 @@ public class TestEndToEndSplitTransaction { log("found region in META: " + hri.getRegionNameAsString()); break; } - Threads.sleep(10); + Threads.sleep(100); } } @@ -690,7 +690,7 @@ public class TestEndToEndSplitTransaction { } catch (IOException ex) { // wait some more } - Threads.sleep(10); + Threads.sleep(100); } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java index 32700af1f8..fd1527bb7e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java @@ -1380,15 +1380,20 @@ public class TestSplitTransactionOnCluster { regionServer.kill(); cluster.getRegionServerThreads().get(serverWith).join(); // Wait until finish processing of shutdown - while (cluster.getMaster().getServerManager().areDeadServersInProgress()) { - Thread.sleep(10); - } - AssignmentManager am = cluster.getMaster().getAssignmentManager(); - while(am.getRegionStates().isRegionsInTransition()) { - Thread.sleep(10); - } - assertEquals(am.getRegionStates().getRegionsInTransition().toString(), 0, am - .getRegionStates().getRegionsInTransition().size()); + TESTING_UTIL.waitFor(60000, 1000, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return !cluster.getMaster().getServerManager().areDeadServersInProgress(); + } + }); + // Wait until there are no more regions in transition + TESTING_UTIL.waitFor(60000, 1000, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return !cluster.getMaster().getAssignmentManager(). + getRegionStates().isRegionsInTransition(); + } + }); regionDirs = FSUtils.getRegionDirs(tableDir.getFileSystem(cluster.getConfiguration()), tableDir); assertEquals(1,regionDirs.size()); -- 2.19.0