From 26118b93ef9e2dc41a42af5237a2e4af86f62410 Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Thu, 14 Feb 2019 15:59:47 +0800 Subject: [PATCH] HBASE-21875 Change the retry logic in RSProcedureDispatcher to 'retry by default, only if xxx' --- .../procedure/RSProcedureDispatcher.java | 79 +++++---- .../assignment/TestAssignmentManager.java | 161 ++++++++++++------ 2 files changed, 160 insertions(+), 80 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java index 88a4db8fb0..3165bd00c7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java @@ -22,14 +22,15 @@ import java.lang.Thread.UncaughtExceptionHandler; import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.RegionInfo; -import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.ServerListener; import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher; +import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.ipc.RemoteException; @@ -175,44 +176,60 @@ public class RSProcedureDispatcher } protected boolean scheduleForRetry(final IOException e) { + LOG.debug("request to {} failed, try={}", serverName, numberOfAttemptsSoFar, e); // Should we wait a little before retrying? If the server is starting it's yes. - final boolean hold = (e instanceof ServerNotRunningYetException); - if (hold) { - LOG.warn(String.format("waiting a little before trying on the same server=%s try=%d", - serverName, numberOfAttemptsSoFar), e); + if (e instanceof ServerNotRunningYetException) { long now = EnvironmentEdgeManager.currentTime(); - if (now < getMaxWaitTime()) { - if (LOG.isDebugEnabled()) { - LOG.debug(String.format("server is not yet up; waiting up to %dms", - (getMaxWaitTime() - now)), e); - } + long waitTime = getMaxWaitTime() - now; + if (waitTime > 0) { + LOG.warn( + "waiting a little before trying on the same server={}," + " try={}, waiting up to {}ms", + serverName, numberOfAttemptsSoFar, waitTime); + numberOfAttemptsSoFar++; submitTask(this, 100, TimeUnit.MILLISECONDS); return true; } - - LOG.warn(String.format("server %s is not up for a while; try a new one", serverName), e); + LOG.warn("server {} is not up for a while; try a new one", serverName); return false; } - - // In case it is a connection exception and the region server is still online, - // the openRegion RPC could have been accepted by the server and - // just the response didn't go through. So we will retry to - // open the region on the same server. - final boolean retry = !hold && (ClientExceptionsUtil.isConnectionException(e) - && master.getServerManager().isServerOnline(serverName)); - if (retry) { - // we want to retry as many times as needed as long as the RS is not dead. - if (LOG.isDebugEnabled()) { - LOG.debug(String.format("Retrying to same RegionServer %s because: %s", - serverName, e.getMessage()), e); - } - submitTask(this, 100, TimeUnit.MILLISECONDS); - return true; + if (e instanceof DoNotRetryIOException) { + LOG.warn("server {} tells us do not retry due to {}, try={}, give up", serverName, + e.toString(), numberOfAttemptsSoFar); + return false; } - // trying to send the request elsewhere instead - LOG.warn(String.format("Failed dispatch to server=%s try=%d", - serverName, numberOfAttemptsSoFar), e); - return false; + // this exception is thrown in the rpc framework, where we can make sure that the call has not + // been executed yet, so it is safe to mark it as fail. Especially for open a region, we'd + // better choose another region server + // notice that, it is safe to quit only if this is the first time we send request to region + // server. Maybe the region server has accept our request the first time, and then there is a + // network error which prevents we receive the response, and the second time we hit a + // CallQueueTooBigException, obviously it is not safe to quit here, otherwise it may lead to a + // double assign... + if (e instanceof CallQueueTooBigException && numberOfAttemptsSoFar == 0) { + LOG.warn("request to {} failed due to {}, try={}, this usually because" + + " server is overloaded, give up", serverName, e.toString(), numberOfAttemptsSoFar); + return false; + } + // Always retry for other exception types if the region server is not dead yet. + if (!master.getServerManager().isServerOnline(serverName)) { + LOG.warn("request to {} failed due to {}, try={}, and the server is dead, give up", + serverName, e.toString(), numberOfAttemptsSoFar); + return false; + } + if (e instanceof RegionServerAbortedException || e instanceof RegionServerStoppedException) { + // A better way is to return true here to let the upper layer quit, and then schedule a + // background task to check whether the region server is dead. And if it is dead, call + // remoteCallFailed to tell the upper layer. Keep retrying here does not lead to incorrect + // result, but waste some resources. + LOG.warn("server {} is aborted or stopped, for safety we still need to" + + " wait until it is fully dead, try={}", serverName, numberOfAttemptsSoFar); + } else { + LOG.warn("request to server {} failed due to {}, try={}, retrying...", serverName, + e.toString(), numberOfAttemptsSoFar); + } + numberOfAttemptsSoFar++; + submitTask(this, 100, TimeUnit.MILLISECONDS); + return true; } private long getMaxWaitTime() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java index 81dac8e2a7..bf5ba2e665 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java @@ -38,6 +38,7 @@ import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; @@ -48,6 +49,7 @@ import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.exceptions.UnexpectedStateException; +import org.apache.hadoop.hbase.ipc.CallTimeoutException; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.RegionState.State; @@ -55,8 +57,6 @@ import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait; import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher; -import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.RegionCloseOperation; -import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.RegionOpenOperation; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureMetrics; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; @@ -72,7 +72,6 @@ import org.apache.hadoop.ipc.RemoteException; import org.junit.After; import org.junit.Before; import org.junit.ClassRule; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -217,25 +216,53 @@ public class TestAssignmentManager { } } - @Ignore @Test // Disabled for now. Since HBASE-18551, this mock is insufficient. - public void testSocketTimeout() throws Exception { + @Test + public void testAssignSocketTimeout() throws Exception { final TableName tableName = TableName.valueOf(this.name.getMethodName()); final RegionInfo hri = createRegionInfo(tableName, 1); // collect AM metrics before test collectAssignmentManagerMetrics(); - rsDispatcher.setMockRsExecutor(new SocketTimeoutRsExecutor(20, 3)); + rsDispatcher.setMockRsExecutor(new SocketTimeoutRsExecutor(20)); waitOnFuture(submitProcedure(am.createAssignProcedure(hri))); - rsDispatcher.setMockRsExecutor(new SocketTimeoutRsExecutor(20, 1)); - // exception.expect(ServerCrashException.class); - waitOnFuture(submitProcedure(am.createUnassignProcedure(hri, null, false))); + assertEquals(assignSubmittedCount + 1, assignProcMetrics.getSubmittedCounter().getCount()); + assertEquals(assignFailedCount, assignProcMetrics.getFailedCounter().getCount()); + } + + @Test + public void testAssignQueueFullOnce() throws Exception { + TableName tableName = TableName.valueOf(this.name.getMethodName()); + RegionInfo hri = createRegionInfo(tableName, 1); + + // collect AM metrics before test + collectAssignmentManagerMetrics(); + + rsDispatcher.setMockRsExecutor(new CallQueueTooBigOnceRsExecutor()); + waitOnFuture(submitProcedure(am.createAssignProcedure(hri))); + + assertEquals(assignSubmittedCount + 1, assignProcMetrics.getSubmittedCounter().getCount()); + assertEquals(assignFailedCount, assignProcMetrics.getFailedCounter().getCount()); + } + + @Test + public void testTimeoutThenQueueFull() throws Exception { + TableName tableName = TableName.valueOf(this.name.getMethodName()); + RegionInfo hri = createRegionInfo(tableName, 1); + + // collect AM metrics before test + collectAssignmentManagerMetrics(); + + rsDispatcher.setMockRsExecutor(new TimeoutThenCallQueueTooBigRsExecutor(10)); + waitOnFuture(submitProcedure(am.createAssignProcedure(hri))); + rsDispatcher.setMockRsExecutor(new TimeoutThenCallQueueTooBigRsExecutor(15)); + waitOnFuture(submitProcedure(am.createUnassignProcedure(hri))); assertEquals(assignSubmittedCount + 1, assignProcMetrics.getSubmittedCounter().getCount()); assertEquals(assignFailedCount, assignProcMetrics.getFailedCounter().getCount()); assertEquals(unassignSubmittedCount + 1, unassignProcMetrics.getSubmittedCounter().getCount()); - assertEquals(unassignFailedCount + 1, unassignProcMetrics.getFailedCounter().getCount()); + assertEquals(unassignFailedCount, unassignProcMetrics.getFailedCounter().getCount()); } @Test @@ -264,44 +291,27 @@ public class TestAssignmentManager { // Assign the region (without problems) rsDispatcher.setMockRsExecutor(new GoodRsExecutor()); waitOnFuture(submitProcedure(am.createAssignProcedure(hri))); - - // TODO: Currently unassign just keeps trying until it sees a server crash. - // There is no count on unassign. - /* - // Test Unassign operation failure - rsDispatcher.setMockRsExecutor(executor); - waitOnFuture(submitProcedure(am.createUnassignProcedure(hri, null, false))); - - assertEquals(assignSubmittedCount + 2, assignProcMetrics.getSubmittedCounter().getCount()); - assertEquals(assignFailedCount + 1, assignProcMetrics.getFailedCounter().getCount()); - assertEquals(unassignSubmittedCount + 1, unassignProcMetrics.getSubmittedCounter().getCount()); - - // TODO: We supposed to have 1 failed assign, 1 successful assign and a failed unassign - // operation. But ProcV2 framework marks aborted unassign operation as success. Fix it! - assertEquals(unassignFailedCount, unassignProcMetrics.getFailedCounter().getCount()); - */ } - @Test - public void testIOExceptionOnAssignment() throws Exception { + public void testDoNotRetryExceptionOnAssignment() throws Exception { // collect AM metrics before test collectAssignmentManagerMetrics(); - testFailedOpen(TableName.valueOf("testExceptionOnAssignment"), - new FaultyRsExecutor(new IOException("test fault"))); + testFailedOpen(TableName.valueOf("testDoNotRetryExceptionOnAssignment"), + new FaultyRsExecutor(new DoNotRetryIOException("test do not retry fault"))); assertEquals(assignSubmittedCount + 1, assignProcMetrics.getSubmittedCounter().getCount()); assertEquals(assignFailedCount + 1, assignProcMetrics.getFailedCounter().getCount()); } @Test - public void testDoNotRetryExceptionOnAssignment() throws Exception { + public void testCallQueueTooBigExceptionOnAssignment() throws Exception { // collect AM metrics before test collectAssignmentManagerMetrics(); - testFailedOpen(TableName.valueOf("testDoNotRetryExceptionOnAssignment"), - new FaultyRsExecutor(new DoNotRetryIOException("test do not retry fault"))); + testFailedOpen(TableName.valueOf("testCallQueueTooBigExceptionOnAssignment"), + new FaultyRsExecutor(new CallQueueTooBigException("test do not retry fault"))); assertEquals(assignSubmittedCount + 1, assignProcMetrics.getSubmittedCounter().getCount()); assertEquals(assignFailedCount + 1, assignProcMetrics.getFailedCounter().getCount()); @@ -608,18 +618,14 @@ public class TestAssignmentManager { throw exception; } } - - private class SocketTimeoutRsExecutor extends GoodRsExecutor { - private final int maxSocketTimeoutRetries; - private final int maxServerRetries; + protected class SocketTimeoutRsExecutor extends GoodRsExecutor { + private final int timeoutTimes; private ServerName lastServer; - private int sockTimeoutRetries; - private int serverRetries; + private int retries; - public SocketTimeoutRsExecutor(int maxSocketTimeoutRetries, int maxServerRetries) { - this.maxServerRetries = maxServerRetries; - this.maxSocketTimeoutRetries = maxSocketTimeoutRetries; + public SocketTimeoutRsExecutor(int timeoutTimes) { + this.timeoutTimes = timeoutTimes; } @Override @@ -627,22 +633,79 @@ public class TestAssignmentManager { throws IOException { // SocketTimeoutException should be a temporary problem // unless the server will be declared dead. - if (sockTimeoutRetries++ < maxSocketTimeoutRetries) { - if (sockTimeoutRetries == 1) assertNotEquals(lastServer, server); + retries++; + if (retries == 1) { lastServer = server; - LOG.debug("Socket timeout for server=" + server + " retries=" + sockTimeoutRetries); - throw new SocketTimeoutException("simulate socket timeout"); - } else if (serverRetries++ < maxServerRetries) { - LOG.info("Mark server=" + server + " as dead. serverRetries=" + serverRetries); + } + if (retries <= timeoutTimes) { + LOG.debug("Socket timeout for server=" + server + " retries=" + retries); + // should not change the server if the server is not dead yet. + assertEquals(lastServer, server); + if (retries == timeoutTimes) { + LOG.info("Mark server=" + server + " as dead. retries=" + retries); master.getServerManager().moveFromOnlineToDeadServers(server); - sockTimeoutRetries = 0; + } throw new SocketTimeoutException("simulate socket timeout"); } else { + // should select another server + assertNotEquals(lastServer, server); return super.sendRequest(server, req); } } } + protected class CallQueueTooBigOnceRsExecutor extends GoodRsExecutor { + + private boolean invoked = false; + + private ServerName lastServer; + + @Override + public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req) + throws IOException { + if (!invoked) { + lastServer = server; + invoked = true; + throw new CallQueueTooBigException("simulate queue full"); + } + // better select another server since the server is over loaded, but anyway, it is fine to + // still select the same server since it is not dead yet... + if (lastServer.equals(server)) { + LOG.warn("We still select the same server, which is not good."); + } + return super.sendRequest(server, req); + } + } + + protected class TimeoutThenCallQueueTooBigRsExecutor extends GoodRsExecutor { + + private final int queueFullTimes; + + private int retries; + + private ServerName lastServer; + + public TimeoutThenCallQueueTooBigRsExecutor(int queueFullTimes) { + this.queueFullTimes = queueFullTimes; + } + + @Override + public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req) + throws IOException { + retries++; + if (retries == 1) { + lastServer = server; + throw new CallTimeoutException("simulate call timeout"); + } + // should always retry on the same server + assertEquals(lastServer, server); + if (retries < queueFullTimes) { + throw new CallQueueTooBigException("simulate queue full"); + } + return super.sendRequest(server, req); + } + } + /** * Takes open request and then returns nothing so acts like a RS that went zombie. * No response (so proc is stuck/suspended on the Master and won't wake up.). We -- 2.17.1