From 2833af3731bc5afae45b8408f5aff3462c1fa867 Mon Sep 17 00:00:00 2001 From: Sandeep Guggilam Date: Wed, 20 May 2020 15:43:04 -0700 Subject: [PATCH] HBASE-24069 Provide an ExponentialBackOffPolicy sleep between failed region close requests --- .../hbase/master/AssignmentManager.java | 153 ++++++++++++------ .../TestAssignmentManagerOnCluster.java | 73 ++++++++- 2 files changed, 172 insertions(+), 54 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index 5386f6edd3..4f1d49fe4f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -37,8 +37,8 @@ import java.util.TreeMap; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -251,6 +251,9 @@ public class AssignmentManager extends ZooKeeperListener { private final ConcurrentHashMap failedOpenTracker = new ConcurrentHashMap(); + private final ConcurrentHashMap failedCloseTracker = + new ConcurrentHashMap(); + // A flag to indicate if we are using ZK for region assignment private final boolean useZKForAssignment; @@ -1972,6 +1975,13 @@ public class AssignmentManager extends ZooKeeperListener { final RegionState state, final int versionOfClosingNode, final ServerName dest, final boolean transitionInZK, final ServerName src) { + String encodedName = region.getEncodedName(); + AtomicInteger failedCloseCount = failedCloseTracker.get(encodedName); + if (failedCloseCount == null) { + failedCloseCount = new AtomicInteger(); + failedCloseTracker.put(encodedName, failedCloseCount); + } + ServerName server = src; if (state != null) { server = state.getServerName(); @@ -1985,7 +1995,7 @@ public class AssignmentManager extends ZooKeeperListener { // ClosedRegionhandler can remove the server from this.regions if (!serverManager.isServerOnline(server)) { LOG.debug("Offline " + region.getRegionNameAsString() - + ", no need to unassign since it's on a dead server: " + server); + + ", no need to unassign since it's on a dead server: " + server); if (transitionInZK) { // delete the node. if no node exists need not bother. deleteClosingOrClosedNode(region, server); @@ -1997,40 +2007,37 @@ public class AssignmentManager extends ZooKeeperListener { } try { // Send CLOSE RPC - if (serverManager.sendRegionClose(server, region, - versionOfClosingNode, dest, transitionInZK)) { - LOG.debug("Sent CLOSE to " + server + " for region " + - region.getRegionNameAsString()); + if (serverManager.sendRegionClose(server, region, versionOfClosingNode, dest, + transitionInZK)) { + LOG.debug("Sent CLOSE to " + server + " for region " + region.getRegionNameAsString()); if (useZKForAssignment && !transitionInZK && state != null) { // Retry to make sure the region is // closed so as to avoid double assignment. - unassign(region, state, versionOfClosingNode, - dest, transitionInZK, src); + unassign(region, state, versionOfClosingNode, dest, transitionInZK, src); } return; } // This never happens. Currently regionserver close always return true. // Todo; this can now happen (0.96) if there is an exception in a coprocessor - LOG.warn("Server " + server + " region CLOSE RPC returned false for " + - region.getRegionNameAsString()); + LOG.warn("Server " + server + " region CLOSE RPC returned false for " + + region.getRegionNameAsString()); } catch (Throwable t) { long sleepTime = 0; Configuration conf = this.server.getConfiguration(); if (t instanceof RemoteException) { - t = ((RemoteException)t).unwrapRemoteException(); + t = ((RemoteException) t).unwrapRemoteException(); } boolean logRetries = true; - if (t instanceof RegionServerAbortedException - || t instanceof RegionServerStoppedException + if (t instanceof RegionServerAbortedException || t instanceof RegionServerStoppedException || t instanceof ServerNotRunningYetException) { // RS is aborting or stopping, we cannot offline the region since the region may need - // to do WAL recovery. Until we see the RS expiration, we should retry. + // to do WAL recovery. Until we see the RS expiration, we should retry. sleepTime = 1L + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, RpcClient.FAILED_SERVER_EXPIRY_DEFAULT); } else if (t instanceof NotServingRegionException) { - LOG.debug("Offline " + region.getRegionNameAsString() - + ", it's not any more on " + server, t); + LOG.debug( + "Offline " + region.getRegionNameAsString() + ", it's not any more on " + server, t); if (transitionInZK) { deleteClosingOrClosedNode(region, server); } @@ -2038,39 +2045,38 @@ public class AssignmentManager extends ZooKeeperListener { regionOffline(region); } return; - } else if ((t instanceof FailedServerException) || (state != null && - t instanceof RegionAlreadyInTransitionException)) { - if (t instanceof FailedServerException) { - sleepTime = 1L + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, + } else if ((t instanceof FailedServerException) + || (state != null && t instanceof RegionAlreadyInTransitionException)) { + if (t instanceof FailedServerException) { + sleepTime = 1L + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, RpcClient.FAILED_SERVER_EXPIRY_DEFAULT); - } else { - // RS is already processing this region, only need to update the timestamp - LOG.debug("update " + state + " the timestamp."); - state.updateTimestampToNow(); - if (maxWaitTime < 0) { - maxWaitTime = - EnvironmentEdgeManager.currentTime() - + conf.getLong(ALREADY_IN_TRANSITION_WAITTIME, - DEFAULT_ALREADY_IN_TRANSITION_WAITTIME); - } - long now = EnvironmentEdgeManager.currentTime(); - if (now < maxWaitTime) { - LOG.debug("Region is already in transition; " - + "waiting up to " + (maxWaitTime - now) + "ms", t); - sleepTime = 100; - i--; // reset the try count - logRetries = false; + } else { + // RS is already processing this region, only need to update the timestamp + LOG.debug("update " + state + " the timestamp."); + state.updateTimestampToNow(); + if (maxWaitTime < 0) { + maxWaitTime = EnvironmentEdgeManager.currentTime() + conf.getLong( + ALREADY_IN_TRANSITION_WAITTIME, DEFAULT_ALREADY_IN_TRANSITION_WAITTIME); + } + long now = EnvironmentEdgeManager.currentTime(); + if (now < maxWaitTime) { + LOG.debug("Region is already in transition; " + "waiting up to " + + (maxWaitTime - now) + "ms", + t); + sleepTime = 100; + i--; // reset the try count + logRetries = false; + } + } } - } - } try { if (sleepTime > 0) { Thread.sleep(sleepTime); } } catch (InterruptedException ie) { - LOG.warn("Failed to unassign " - + region.getRegionNameAsString() + " since interrupted", ie); + LOG.warn("Failed to unassign " + region.getRegionNameAsString() + " since interrupted", + ie); Thread.currentThread().interrupt(); if (state != null) { regionStates.updateRegionState(region, State.FAILED_CLOSE); @@ -2079,16 +2085,29 @@ public class AssignmentManager extends ZooKeeperListener { } if (logRetries) { - LOG.info("Server " + server + " returned " + t + " for " - + region.getRegionNameAsString() + ", try=" + i - + " of " + this.maximumAttempts, t); + LOG.info("Server " + server + " returned " + t + " for " + region.getRegionNameAsString() + + ", try=" + i + " of " + this.maximumAttempts, + t); // Presume retry or server will expire. } } } - // Run out of attempts - if (state != null) { - regionStates.updateRegionState(region, State.FAILED_CLOSE); + + long sleepTime = backoffPolicy.getBackoffTime(retryConfig, + getFailedAttempts(encodedName, failedCloseTracker)); + if (failedCloseCount.incrementAndGet() <= maximumAttempts && sleepTime > 0) { + if (failedCloseTracker.containsKey(encodedName)) { + // Sleep before trying unassign if this region has failed to close before + scheduledThreadPoolExecutor.schedule(new DelayedUnAssignCallable(this, region, state, + versionOfClosingNode, dest, transitionInZK, src), + sleepTime, TimeUnit.MILLISECONDS); + } + } else { + // Run out of attempts + if (state != null) { + regionStates.updateRegionState(region, State.FAILED_CLOSE); + } + failedCloseTracker.remove(encodedName); } } @@ -3623,7 +3642,7 @@ public class AssignmentManager extends ZooKeeperListener { if (failedOpenTracker.containsKey(regionInfo.getEncodedName())) { // Sleep before reassigning if this region has failed to open before long sleepTime = backoffPolicy.getBackoffTime(retryConfig, - getFailedAttempts(regionInfo.getEncodedName())); + getFailedAttempts(regionInfo.getEncodedName(), failedOpenTracker)); invokeAssignLater(regionInfo, forceNewPlan, sleepTime); } else { // Immediately reassign if this region has never failed an open before @@ -3631,8 +3650,9 @@ public class AssignmentManager extends ZooKeeperListener { } } - private int getFailedAttempts(String regionName) { - AtomicInteger failedCount = failedOpenTracker.get(regionName); + private int getFailedAttempts(String regionName, + ConcurrentHashMap tracker) { + AtomicInteger failedCount = tracker.get(regionName); if (failedCount != null) { return failedCount.get(); } else { @@ -3996,6 +4016,7 @@ public class AssignmentManager extends ZooKeeperListener { } regionStates.updateRegionState(hri, RegionState.State.CLOSED); sendRegionClosedNotification(hri); + failedCloseTracker.remove(hri.getEncodedName()); // This below has to do w/ online enable/disable of a table removeClosedRegion(hri); invokeAssign(hri, false); @@ -4791,6 +4812,38 @@ public class AssignmentManager extends ZooKeeperListener { } } + private class DelayedUnAssignCallable implements Runnable { + AssignmentManager am; + HRegionInfo region; + RegionState state; + int versionOfClosingNode; + ServerName dest; + boolean transitionInZK; + ServerName src; + + public DelayedUnAssignCallable(AssignmentManager am, HRegionInfo region, RegionState state, + int versionOfClosingNode, ServerName dest, boolean transitionInZK, ServerName src) { + this.am = am; + this.region = region; + this.state = state; + this.versionOfClosingNode = versionOfClosingNode; + this.dest = dest; + this.transitionInZK = transitionInZK; + this.src = src; + } + + @Override + public void run() { + threadPoolExecutorService.submit(new Runnable() { + + @Override + public void run() { + am.unassign(region, state, versionOfClosingNode, dest, transitionInZK, src); + } + }); + } + } + /* * This is only used for unit-testing split failures. */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java index 69dfa40bc9..6fbbde7414 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java @@ -32,11 +32,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -46,7 +46,6 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.MiniHBaseCluster.MiniHBaseClusterRegionServer; @@ -72,6 +71,7 @@ import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ConfigUtil; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -87,6 +87,9 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + /** * This tests AssignmentManager with a testing cluster. @@ -598,6 +601,68 @@ public class TestAssignmentManagerOnCluster { } } + /** + * This tests region close with exponential backoff + */ + @Test(timeout = 60000) + public void testCloseRegionWithExponentialBackOff() throws Exception { + String table = "testCloseRegionWithExponentialBackOff"; + // Set the backoff time between each retry for failed close + TEST_UTIL.getMiniHBaseCluster().getConf().setLong("hbase.assignment.retry.sleep.initial", 1000); + HMaster activeMaster = TEST_UTIL.getHBaseCluster().getMaster(); + TEST_UTIL.getMiniHBaseCluster().stopMaster(activeMaster.getServerName()); + TEST_UTIL.getMiniHBaseCluster().startMaster(); // restart the master for conf take into affect + + try { + ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = + new ScheduledThreadPoolExecutor(1, Threads.newDaemonThreadFactory("ExponentialBackOff")); + + HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(table)); + desc.addFamily(new HColumnDescriptor(FAMILY)); + admin.createTable(desc); + + Table meta = new HTable(conf, TableName.META_TABLE_NAME); + HRegionInfo hri = + new HRegionInfo(desc.getTableName(), Bytes.toBytes("A"), Bytes.toBytes("Z")); + MetaTableAccessor.addRegionToMeta(meta, hri); + + HMaster master = TEST_UTIL.getHBaseCluster().getMaster(); + AssignmentManager am = master.getAssignmentManager(); + assertTrue(TEST_UTIL.assignRegion(hri)); + ServerName sn = am.getRegionStates().getRegionServerOfRegion(hri); + TEST_UTIL.assertRegionOnServer(hri, sn, 6000); + + MyRegionObserver.preCloseEnabled.set(true); + // Unset the precloseEnabled flag after 1 second for next retry to succeed + scheduledThreadPoolExecutor.schedule(new Runnable() { + @Override + public void run() { + MyRegionObserver.preCloseEnabled.set(false); + } + }, 1000, TimeUnit.MILLISECONDS); + am.unassign(hri); + + // region may still be assigned now since it's closing, + // let's check if it's assigned after it's out of transition + am.waitOnRegionToClearRegionsInTransition(hri); + + // region should be closed and re-assigned + assertTrue(am.waitForAssignment(hri)); + ServerName serverName = + master.getAssignmentManager().getRegionStates().getRegionServerOfRegion(hri); + TEST_UTIL.assertRegionOnServer(hri, serverName, 6000); + } finally { + MyRegionObserver.preCloseEnabled.set(false); + TEST_UTIL.deleteTable(Bytes.toBytes(table)); + + // reset the backoff time to default + TEST_UTIL.getMiniHBaseCluster().getConf().unset("hbase.assignment.retry.sleep.initial"); + activeMaster = TEST_UTIL.getMiniHBaseCluster().getMaster(); + TEST_UTIL.getMiniHBaseCluster().stopMaster(activeMaster.getServerName()); + TEST_UTIL.getMiniHBaseCluster().startMaster(); + } + } + /** * This tests region open failed */ @@ -889,7 +954,7 @@ public class TestAssignmentManagerOnCluster { /** * This tests region close racing with open */ - @Test (timeout=60000) + @Test(timeout = 60000) public void testOpenCloseRacing() throws Exception { String table = "testOpenCloseRacing"; try { -- 2.24.1