From a37c7f5a21efcf80de3734d182267a5cd5344e50 Mon Sep 17 00:00:00 2001 From: Elliott Clark Date: Wed, 20 Jan 2016 17:43:22 -0800 Subject: [PATCH] HBASE-15146 Don't block on Reader threads queueing to a scheduler queue --- .../hadoop/hbase/CallQueueTooBigException.java | 33 ++++++++ .../apache/hadoop/hbase/client/AsyncProcess.java | 9 +- .../hbase/client/ConnectionImplementation.java | 52 +----------- .../hbase/exceptions/ClientExceptionsUtil.java | 95 ++++++++++++++++++++++ .../hadoop/hbase/client/TestAsyncProcess.java | 39 +++++++-- .../hbase/exceptions/TestClientExceptionsUtil.java | 37 +++++++++ .../hadoop/hbase/ipc/BalancedQueueRpcExecutor.java | 4 +- .../apache/hadoop/hbase/ipc/FifoRpcScheduler.java | 12 ++- .../hadoop/hbase/ipc/RWQueueRpcExecutor.java | 4 +- .../org/apache/hadoop/hbase/ipc/RpcExecutor.java | 2 +- .../org/apache/hadoop/hbase/ipc/RpcScheduler.java | 2 +- .../org/apache/hadoop/hbase/ipc/RpcServer.java | 21 +++-- .../hadoop/hbase/ipc/SimpleRpcScheduler.java | 8 +- .../org/apache/hadoop/hbase/client/TestHCM.java | 3 +- 14 files changed, 243 insertions(+), 78 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/CallQueueTooBigException.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java create mode 100644 hbase-client/src/test/java/org/apache/hadoop/hbase/exceptions/TestClientExceptionsUtil.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/CallQueueTooBigException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/CallQueueTooBigException.java new file mode 100644 index 0000000..d07c657 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/CallQueueTooBigException.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +import java.io.IOException; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +@SuppressWarnings("serial") +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class CallQueueTooBigException extends IOException { + public CallQueueTooBigException() { + super(); + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index 5102ec5..4ceb89a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.backoff.ServerStatistics; import org.apache.hadoop.hbase.client.coprocessor.Batch; +import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -1174,9 +1175,9 @@ class AsyncProcess { byte[] row = e.getValue().iterator().next().getAction().getRow(); // Do not use the exception for updating cache because it might be coming from // any of the regions in the MultiAction. - // TODO: depending on type of exception we might not want to update cache at all? if (tableName != null) { - connection.updateCachedLocations(tableName, regionName, row, null, server); + connection.updateCachedLocations(tableName, regionName, row, + ClientExceptionsUtil.isMetaClearingException(t) ? null : t, server); } for (Action action : e.getValue()) { Retry retry = manageError( @@ -1293,7 +1294,7 @@ class AsyncProcess { // Failure: retry if it's make sense else update the errors lists if (result == null || result instanceof Throwable) { Row row = sentAction.getAction(); - throwable = ConnectionImplementation.findException(result); + throwable = ClientExceptionsUtil.findException(result); // Register corresponding failures once per server/once per region. if (!regionFailureRegistered) { regionFailureRegistered = true; @@ -1316,7 +1317,7 @@ class AsyncProcess { ++failed; } } else { - + if (AsyncProcess.this.connection.getConnectionMetrics() != null) { AsyncProcess.this.connection.getConnectionMetrics(). updateServerStats(server, regionName, result); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index d730287..dc59e6e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -35,10 +35,7 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.MetaTableAccessor; -import org.apache.hadoop.hbase.MultiActionResultTooLarge; import org.apache.hadoop.hbase.RegionLocations; -import org.apache.hadoop.hbase.RegionTooBusyException; -import org.apache.hadoop.hbase.RetryImmediatelyException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotEnabledException; @@ -48,8 +45,8 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory; import org.apache.hadoop.hbase.client.coprocessor.Batch; +import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; import org.apache.hadoop.hbase.exceptions.RegionMovedException; -import org.apache.hadoop.hbase.exceptions.RegionOpeningException; import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcClientFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; @@ -68,7 +65,6 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SecurityCapabilit import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetNormalizerRunningRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetNormalizerRunningResponse; -import org.apache.hadoop.hbase.quotas.ThrottlingException; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; @@ -282,47 +278,6 @@ class ConnectionImplementation implements ClusterConnection, Closeable { return ng; } - /** - * Look for an exception we know in the remote exception: - * - hadoop.ipc wrapped exceptions - * - nested exceptions - * - * Looks for: RegionMovedException / RegionOpeningException / RegionTooBusyException / - * ThrottlingException - * @return null if we didn't find the exception, the exception otherwise. - */ - public static Throwable findException(Object exception) { - if (exception == null || !(exception instanceof Throwable)) { - return null; - } - Throwable cur = (Throwable) exception; - while (cur != null) { - if (cur instanceof RegionMovedException || cur instanceof RegionOpeningException - || cur instanceof RegionTooBusyException || cur instanceof ThrottlingException - || cur instanceof RetryImmediatelyException) { - return cur; - } - if (cur instanceof RemoteException) { - RemoteException re = (RemoteException) cur; - cur = re.unwrapRemoteException( - RegionOpeningException.class, RegionMovedException.class, - RegionTooBusyException.class); - if (cur == null) { - cur = re.unwrapRemoteException(); - } - // unwrapRemoteException can return the exception given as a parameter when it cannot - // unwrap it. In this case, there is no need to look further - // noinspection ObjectEquality - if (cur == re) { - return null; - } - } else { - cur = cur.getCause(); - } - } - - return null; - } @Override public HTableInterface getTable(String tableName) throws IOException { @@ -1908,10 +1863,9 @@ class ConnectionImplementation implements ClusterConnection, Closeable { } HRegionInfo regionInfo = oldLocation.getRegionInfo(); - Throwable cause = findException(exception); + Throwable cause = ClientExceptionsUtil.findException(exception); if (cause != null) { - if (cause instanceof RegionTooBusyException || cause instanceof RegionOpeningException - || cause instanceof ThrottlingException || cause instanceof MultiActionResultTooLarge) { + if (!ClientExceptionsUtil.isMetaClearingException(cause)) { // We know that the region is still on this region server return; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java new file mode 100644 index 0000000..ebf1499 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java @@ -0,0 +1,95 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.exceptions; + +import org.apache.hadoop.hbase.CallQueueTooBigException; +import org.apache.hadoop.hbase.MultiActionResultTooLarge; +import org.apache.hadoop.hbase.RegionTooBusyException; +import org.apache.hadoop.hbase.RetryImmediatelyException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.quotas.ThrottlingException; +import org.apache.hadoop.ipc.RemoteException; + +@InterfaceAudience.Private +@InterfaceStability.Evolving +public final class ClientExceptionsUtil { + + private ClientExceptionsUtil() {} + + public static boolean isMetaClearingException(Throwable cur) { + cur = findException(cur); + + if (cur == null) { + return true; + } + return !isSpecialException(cur) || (cur instanceof RegionMovedException); + } + + public static boolean isSpecialException(Throwable cur) { + return (cur instanceof RegionMovedException || cur instanceof RegionOpeningException + || cur instanceof RegionTooBusyException || cur instanceof ThrottlingException + || cur instanceof MultiActionResultTooLarge || cur instanceof RetryImmediatelyException + || cur instanceof CallQueueTooBigException); + } + + + /** + * Look for an exception we know in the remote exception: + * - hadoop.ipc wrapped exceptions + * - nested exceptions + * + * Looks for: RegionMovedException / RegionOpeningException / RegionTooBusyException / + * ThrottlingException + * @return null if we didn't find the exception, the exception otherwise. + */ + public static Throwable findException(Object exception) { + if (exception == null || !(exception instanceof Throwable)) { + return null; + } + Throwable cur = (Throwable) exception; + while (cur != null) { + if (isSpecialException(cur)) { + return cur; + } + if (cur instanceof RemoteException) { + RemoteException re = (RemoteException) cur; + cur = re.unwrapRemoteException( + RegionOpeningException.class, RegionMovedException.class, + RegionTooBusyException.class); + if (cur == null) { + cur = re.unwrapRemoteException(); + } + // unwrapRemoteException can return the exception given as a parameter when it cannot + // unwrap it. In this case, there is no need to look further + // noinspection ObjectEquality + if (cur == re) { + return cur; + } + } else if (cur.getCause() != null) { + cur = cur.getCause(); + } else { + return cur; + } + } + + return null; + } +} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index 067f2ad..fb8b20b 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -45,6 +45,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.CategoryBasedTimeout; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; @@ -218,28 +219,35 @@ public class TestAsyncProcess { static class CallerWithFailure extends RpcRetryingCallerImpl{ - public CallerWithFailure() { + private final IOException e; + + public CallerWithFailure(IOException e) { super(100, 100, 9); + this.e = e; } @Override public MultiResponse callWithoutRetries(RetryingCallable callable, int callTimeout) throws IOException, RuntimeException { - throw new IOException("test"); + throw e; } } + static class AsyncProcessWithFailure extends MyAsyncProcess { - public AsyncProcessWithFailure(ClusterConnection hc, Configuration conf) { + private final IOException ioe; + + public AsyncProcessWithFailure(ClusterConnection hc, Configuration conf, IOException ioe) { super(hc, conf, true); + this.ioe = ioe; serverTrackerTimeout = 1; } @Override protected RpcRetryingCaller createCaller(MultiServerCallable callable) { callsCt.incrementAndGet(); - return new CallerWithFailure(); + return new CallerWithFailure(ioe); } } @@ -825,7 +833,7 @@ public class TestAsyncProcess { public void testGlobalErrors() throws IOException { ClusterConnection conn = new MyConnectionImpl(conf); BufferedMutatorImpl mutator = (BufferedMutatorImpl) conn.getBufferedMutator(DUMMY_TABLE); - AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, conf); + AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, conf, new IOException("test")); mutator.ap = ap; Assert.assertNotNull(mutator.ap.createServerErrorTracker()); @@ -842,6 +850,27 @@ public class TestAsyncProcess { Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); } + + @Test + public void testCallQueueTooLarge() throws IOException { + ClusterConnection conn = new MyConnectionImpl(conf); + BufferedMutatorImpl mutator = (BufferedMutatorImpl) conn.getBufferedMutator(DUMMY_TABLE); + AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, conf, new CallQueueTooBigException()); + mutator.ap = ap; + + Assert.assertNotNull(mutator.ap.createServerErrorTracker()); + + Put p = createPut(1, true); + mutator.mutate(p); + + try { + mutator.flush(); + Assert.fail(); + } catch (RetriesExhaustedWithDetailsException expected) { + } + // Checking that the ErrorsServers came into play and didn't make us stop immediately + Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); + } /** * This test simulates multiple regions on 2 servers. We should have 2 multi requests and * 2 threads: 1 per server, this whatever the number of regions. diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/exceptions/TestClientExceptionsUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/exceptions/TestClientExceptionsUtil.java new file mode 100644 index 0000000..968e55c --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/exceptions/TestClientExceptionsUtil.java @@ -0,0 +1,37 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.exceptions; + +import com.google.protobuf.ServiceException; +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.*; + +@SuppressWarnings("ThrowableInstanceNeverThrown") +public class TestClientExceptionsUtil { + + @Test + public void testFindException() throws Exception { + IOException ioe = new IOException("Tesst"); + ServiceException se = new ServiceException(ioe); + assertEquals(ioe, ClientExceptionsUtil.findException(se)); + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java index 56424df..79b4ec8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java @@ -72,9 +72,9 @@ public class BalancedQueueRpcExecutor extends RpcExecutor { } @Override - public void dispatch(final CallRunner callTask) throws InterruptedException { + public boolean dispatch(final CallRunner callTask) throws InterruptedException { int queueIndex = balancer.getNextQueue(); - queues.get(queueIndex).put(callTask); + return queues.get(queueIndex).offer(callTask); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java index 621a8ef..5e104eb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; /** * A very simple {@code }RpcScheduler} that serves incoming requests in order. @@ -34,6 +35,7 @@ public class FifoRpcScheduler extends RpcScheduler { private final int handlerCount; private final int maxQueueLength; + private final AtomicInteger queueSize = new AtomicInteger(0); private ThreadPoolExecutor executor; public FifoRpcScheduler(Configuration conf, int handlerCount) { @@ -65,14 +67,22 @@ public class FifoRpcScheduler extends RpcScheduler { } @Override - public void dispatch(final CallRunner task) throws IOException, InterruptedException { + public boolean dispatch(final CallRunner task) throws IOException, InterruptedException { + // Executors provide no offer, so make our own. + int queued = queueSize.getAndIncrement(); + if (maxQueueLength > 0 && queued >= maxQueueLength) { + queueSize.decrementAndGet(); + return false; + } executor.submit(new Runnable() { @Override public void run() { task.setStatus(RpcServer.getStatus()); task.run(); + queueSize.decrementAndGet(); } }); + return true; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java index 1be8c65..544370d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java @@ -160,7 +160,7 @@ public class RWQueueRpcExecutor extends RpcExecutor { } @Override - public void dispatch(final CallRunner callTask) throws InterruptedException { + public boolean dispatch(final CallRunner callTask) throws InterruptedException { RpcServer.Call call = callTask.getCall(); int queueIndex; if (isWriteRequest(call.getHeader(), call.param)) { @@ -170,7 +170,7 @@ public class RWQueueRpcExecutor extends RpcExecutor { } else { queueIndex = numWriteQueues + readBalancer.getNextQueue(); } - queues.get(queueIndex).put(callTask); + return queues.get(queueIndex).offer(callTask); } private boolean isWriteRequest(final RequestHeader header, final Message param) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java index 27750a7..017bf39 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java @@ -86,7 +86,7 @@ public abstract class RpcExecutor { public abstract int getQueueLength(); /** Add the request to the executor queue */ - public abstract void dispatch(final CallRunner callTask) throws InterruptedException; + public abstract boolean dispatch(final CallRunner callTask) throws InterruptedException; /** Returns the list of request queues */ protected abstract List> getQueues(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java index f273865..fffe7f3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java @@ -58,7 +58,7 @@ public abstract class RpcScheduler { * * @param task the request to be dispatched */ - public abstract void dispatch(CallRunner task) throws IOException, InterruptedException; + public abstract boolean dispatch(CallRunner task) throws IOException, InterruptedException; /** Retrieves length of the general queue for metrics. */ public abstract int getGeneralQueueLength(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 1155751..922174d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -67,6 +67,7 @@ import javax.security.sasl.SaslServer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; @@ -1175,13 +1176,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } } - @SuppressWarnings("serial") - public static class CallQueueTooBigException extends IOException { - CallQueueTooBigException() { - super(); - } - } - /** Reads calls from a connection and queues them for handling. */ @edu.umd.cs.findbugs.annotations.SuppressWarnings( value="VO_VOLATILE_INCREMENT", @@ -1880,7 +1874,18 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { : null; Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder, totalRequestSize, traceInfo, this.addr); - scheduler.dispatch(new CallRunner(RpcServer.this, call)); + + if (!scheduler.dispatch(new CallRunner(RpcServer.this, call))) { + callQueueSize.add(-1 * call.getSize()); + + ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); + metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION); + InetSocketAddress address = getListenerAddress(); + setupResponse(responseBuffer, call, CALL_QUEUE_TOO_BIG_EXCEPTION, + "Call queue is full on " + (address != null ? address : "(channel closed)") + + ", too many items queued ?"); + responder.doRespond(call); + } } private boolean authorizeConnection() throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java index b8e9c52..8de714d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java @@ -190,15 +190,15 @@ public class SimpleRpcScheduler extends RpcScheduler { } @Override - public void dispatch(CallRunner callTask) throws InterruptedException { + public boolean dispatch(CallRunner callTask) throws InterruptedException { RpcServer.Call call = callTask.getCall(); int level = priority.getPriority(call.getHeader(), call.param, call.getRequestUser()); if (priorityExecutor != null && level > highPriorityLevel) { - priorityExecutor.dispatch(callTask); + return priorityExecutor.dispatch(callTask); } else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) { - replicationExecutor.dispatch(callTask); + return replicationExecutor.dispatch(callTask); } else { - callExecutor.dispatch(callTask); + return callExecutor.dispatch(callTask); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java index 16465d2..6370127 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.filter.Filter; @@ -687,7 +688,7 @@ public class TestHCM { Assert.assertArrayEquals(e.getRow(0).getRow(), ROW); // Check that we unserialized the exception as expected - Throwable cause = ConnectionImplementation.findException(e.getCause(0)); + Throwable cause = ClientExceptionsUtil.findException(e.getCause(0)); Assert.assertNotNull(cause); Assert.assertTrue(cause instanceof RegionMovedException); } -- 2.7.0