Index: hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestSizeBasedThrottler.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestSizeBasedThrottler.java (revision 0) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestSizeBasedThrottler.java (revision 0) @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; + +import java.util.ArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hadoop.hbase.MediumTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * This tests some race conditions that can happen + * occasionally, but not every time. + */ +@Category(MediumTests.class) +public class TestSizeBasedThrottler { + + private static final int REPEATS = 100; + + private Thread makeThread(final SizeBasedThrottler throttler, + final AtomicBoolean failed, final int delta, + final int limit, final CountDownLatch latch) { + + Thread ret = new Thread(new Runnable() { + + @Override + public void run() { + try { + latch.await(); + if (throttler.increase(delta) > limit) { + failed.set(true); + } + throttler.decrease(delta); + } catch (Exception e) { + failed.set(true); + } + } + }); + + ret.start(); + return ret; + } + + private void runGenericTest(int threshold, int delta, int maxValueAllowed, + int numberOfThreads, long timeout) { + SizeBasedThrottler throttler = new SizeBasedThrottler(threshold); + AtomicBoolean failed = new AtomicBoolean(false); + + ArrayList threads = new ArrayList(numberOfThreads); + CountDownLatch latch = new CountDownLatch(1); + long timeElapsed = 0; + + for (int i = 0; i < numberOfThreads; ++i) { + threads.add(makeThread(throttler, failed, delta, maxValueAllowed, latch)); + } + + latch.countDown(); + for (Thread t : threads) { + try { + long beforeJoin = System.currentTimeMillis(); + t.join(timeout - timeElapsed); + timeElapsed += System.currentTimeMillis() - beforeJoin; + if (t.isAlive() || timeElapsed >= timeout) { + fail("Timeout reached."); + } + } catch (InterruptedException e) { + fail("Got InterruptedException"); + } + } + + assertFalse(failed.get()); + } + + @Test + public void testSmallIncreases(){ + for (int i = 0; i < REPEATS; ++i) { + runGenericTest( + 10, // threshold + 1, // delta + 15, // fail if throttler's value + // exceeds 15 + 1000, // use 1000 threads + 200 // wait for 200ms + ); + } + } + + @Test + public void testBigIncreases() { + for (int i = 0; i < REPEATS; ++i) { + runGenericTest( + 1, // threshold + 2, // delta + 4, // fail if throttler's value + // exceeds 4 + 1000, // use 1000 threads + 200 // wait for 200ms + ); + } + } + + @Test + public void testIncreasesEqualToThreshold(){ + for (int i = 0; i < REPEATS; ++i) { + runGenericTest( + 1, // threshold + 1, // delta + 2, // fail if throttler's value + // exceeds 2 + 1000, // use 1000 threads + 200 // wait for 200ms + ); + } + } +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/util/SizeBasedThrottler.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/util/SizeBasedThrottler.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/util/SizeBasedThrottler.java (revision 0) @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Utility class that can be used to implement + * queues with limited capacity (in terms of memory). + * It maintains internal counter and provides + * two operations: increase and decrease. + * Increase blocks until internal counter is lower than + * given threshold and then increases internal counter. + * Decrease decreases internal counter and wakes up + * waiting threads if counter is lower than threshold. + * + * This implementation allows you to set the value of internal + * counter to be greater than threshold. It happens + * when internal counter is lower than threshold and + * increase method is called with parameter 'delta' big enough + * so that sum of delta and internal counter is greater than + * threshold. This is not a bug, this is a feature. + * It solves some problems: + * - thread calling increase with big parameter will not be + * starved by other threads calling increase with small + * arguments. + * - thread calling increase with argument greater than + * threshold won't deadlock. This is useful when throttling + * queues - you can submit object that is bigger than limit. + * + * This implementation introduces small costs in terms of + * synchronization (no synchronization in most cases at all), but is + * vulnerable to races. For details see documentation of + * increase method. + */ +@InterfaceAudience.Private +public class SizeBasedThrottler { + + private final long threshold; + private final AtomicLong currentSize; + + /** + * Creates SizeBoundary with provided threshold + * + * @param threshold threshold used by instance + */ + public SizeBasedThrottler(long threshold) { + if (threshold <= 0) { + throw new IllegalArgumentException("Treshold must be greater than 0"); + } + this.threshold = threshold; + this.currentSize = new AtomicLong(0); + } + + /** + * Blocks until internal counter is lower than threshold + * and then increases value of internal counter. + * + * THIS METHOD IS VULNERABLE TO RACES. + * It may happen that increment operation will + * succeed immediately, even if it should block. This happens when + * at least two threads call increase at the some moment. The decision + * whether to block is made at the beginning, without synchronization. + * If value of currentSize is lower than threshold at that time, call + * will succeed immediately. It is possible, that 2 threads will make + * decision not to block, even if one of them should block. + * + * @param delta increase internal counter by this value + * @return new value of internal counter + * @throws InterruptedException when interrupted during waiting + */ + public synchronized long increase(long delta) throws InterruptedException{ + if (currentSize.get() >= threshold) { + synchronized (this) { + while (currentSize.get() >= threshold) { + wait(); + } + } + } + + return currentSize.addAndGet(delta); + } + + + /** + * Decreases value of internal counter. Wakes up waiting threads if required. + * + * @param delta decrease internal counter by this value + * @return new value of internal counter + */ + public synchronized long decrease(long delta) { + final long newSize = currentSize.addAndGet(-delta); + + if (newSize < threshold && newSize + delta >= threshold) { + synchronized (this) { + notifyAll(); + } + } + + return newSize; + } + + /** + * + * @return current value of internal counter + */ + public synchronized long getCurrentValue(){ + return currentSize.get(); + } + + /** + * @return threshold + */ + public long getThreshold(){ + return threshold; + } +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerDynamicMetrics.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerDynamicMetrics.java (revision 1400473) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerDynamicMetrics.java (working copy) @@ -30,6 +30,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.metrics.MetricsContext; import org.apache.hadoop.metrics.MetricsRecord; @@ -59,6 +60,7 @@ private MetricsContext context; private final RegionServerDynamicStatistics rsDynamicStatistics; private Method updateMbeanInfoIfMetricsListChanged = null; + private HRegionServer regionServer; private static final Log LOG = LogFactory.getLog(RegionServerDynamicStatistics.class); @@ -74,13 +76,14 @@ */ public final MetricsRegistry registry = new MetricsRegistry(); - private RegionServerDynamicMetrics() { + private RegionServerDynamicMetrics(HRegionServer regionServer) { this.context = MetricsUtil.getContext("hbase-dynamic"); this.metricsRecord = MetricsUtil.createRecord( this.context, "RegionServerDynamicStatistics"); context.registerUpdater(this); this.rsDynamicStatistics = new RegionServerDynamicStatistics(this.registry); + this.regionServer = regionServer; try { updateMbeanInfoIfMetricsListChanged = this.rsDynamicStatistics.getClass().getSuperclass() @@ -92,9 +95,9 @@ } } - public static RegionServerDynamicMetrics newInstance() { + public static RegionServerDynamicMetrics newInstance(HRegionServer regionServer) { RegionServerDynamicMetrics metrics = - new RegionServerDynamicMetrics(); + new RegionServerDynamicMetrics(regionServer); return metrics; } @@ -184,6 +187,13 @@ for (Entry entry : RegionMetricsStorage.getNumericMetrics().entrySet()) { this.setNumericMetric(entry.getKey(), entry.getValue().getAndSet(0)); } + + /* export estimated size of all response queues */ + if (regionServer != null) { + long responseQueueSize = regionServer.getResponseQueueSize(); + this.setNumericMetric("responseQueuesSize", responseQueueSize); + } + /* get dynamically created numeric metrics, and push the metrics. * These ones aren't to be reset; they are cumulative. */ for (Entry entry : RegionMetricsStorage.getNumericPersistentMetrics().entrySet()) { Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1400473) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -53,7 +53,6 @@ import javax.management.ObjectName; -import com.google.protobuf.Message; import org.apache.commons.lang.mutable.MutableDouble; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -103,8 +102,8 @@ import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType; +import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; -import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.io.hfile.CacheConfig; @@ -113,6 +112,7 @@ import org.apache.hadoop.hbase.ipc.HBaseRPC; import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler; import org.apache.hadoop.hbase.ipc.HBaseRpcMetrics; +import org.apache.hadoop.hbase.ipc.HBaseServer; import org.apache.hadoop.hbase.ipc.ProtocolSignature; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; @@ -152,6 +152,8 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ExecCoprocessorRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ExecCoprocessorResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest; @@ -196,8 +198,8 @@ import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics; import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics.StoreMetricType; import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.regionserver.wal.HLogFactory; import org.apache.hadoop.hbase.regionserver.wal.HLogUtil; -import org.apache.hadoop.hbase.regionserver.wal.HLogFactory; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; @@ -228,12 +230,10 @@ import com.google.common.base.Function; import com.google.protobuf.ByteString; +import com.google.protobuf.Message; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; -import static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest; -import static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse; - /** * HRegionServer makes a set of HRegions available to clients. It checks in with * the HMaster. There are many HRegionServers in a single HBase deployment. @@ -262,6 +262,9 @@ protected long maxScannerResultSize; + // Server to handle client requests. + private HBaseServer server; + // Cache flushing protected MemStoreFlusher cacheFlusher; @@ -518,6 +521,7 @@ conf.getInt("hbase.regionserver.metahandler.count", 10), conf.getBoolean("hbase.rpc.verbose", false), conf, HConstants.QOS_THRESHOLD); + if (rpcServer instanceof HBaseServer) server = (HBaseServer) rpcServer; // Set our address. this.isa = this.rpcServer.getListenerAddress(); @@ -1197,7 +1201,7 @@ this.hlog = setupWALAndReplication(); // Init in here rather than in constructor after thread name has been set this.metrics = new RegionServerMetrics(); - this.dynamicMetrics = RegionServerDynamicMetrics.newInstance(); + this.dynamicMetrics = RegionServerDynamicMetrics.newInstance(this); startServiceThreads(); LOG.info("Serving as " + this.serverNameFromMasterPOV + ", RPC listening on " + this.isa + @@ -4131,4 +4135,11 @@ private String getMyEphemeralNodePath() { return ZKUtil.joinZNode(this.zooKeeper.rsZNode, getServerName().toString()); } + + public long getResponseQueueSize(){ + if (server != null) { + return server.getResponseQueueSize(); + } + return 0; + } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (revision 1400473) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (working copy) @@ -26,6 +26,7 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.io.InterruptedIOException; import java.net.BindException; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -52,12 +53,12 @@ import java.util.List; import java.util.Map; import java.util.Random; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; import javax.security.sasl.Sasl; import javax.security.sasl.SaslException; @@ -68,28 +69,29 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; +import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcException; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestHeader; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader.Status; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation; -import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader.Status; -import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; -import org.apache.hadoop.hbase.monitoring.TaskMonitor; -import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.HBaseSaslRpcServer; import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.AuthMethod; import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler; import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler; import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslStatus; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.ByteBufferOutputStream; +import org.apache.hadoop.hbase.util.SizeBasedThrottler; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.ipc.RPC.VersionMismatch; import org.apache.hadoop.ipc.Server; -import org.apache.hadoop.ipc.RPC.VersionMismatch; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; @@ -97,21 +99,20 @@ import org.apache.hadoop.security.authorize.ProxyUsers; import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; import org.apache.hadoop.security.token.SecretManager; +import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.util.StringUtils; - -import com.google.common.base.Function; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.google.protobuf.Message; - import org.cliffc.high_scale_lib.Counter; import org.cloudera.htrace.Sampler; import org.cloudera.htrace.Span; +import org.cloudera.htrace.Trace; import org.cloudera.htrace.TraceInfo; import org.cloudera.htrace.impl.NullSpan; -import org.cloudera.htrace.Trace; +import com.google.common.base.Function; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.protobuf.Message; + /** A client for an IPC service. IPC calls take a single Protobuf message as a * parameter, and return a single Protobuf message as their value. A service runs on * a port and is defined by a parameter class and a value class. @@ -256,6 +257,14 @@ protected final boolean tcpKeepAlive; // if T then use keepalives protected final long purgeTimeout; // in milliseconds + // responseQueuesSizeThrottler is shared among all responseQueues, + // it bounds memory occupied by responses in all responseQueues + final SizeBasedThrottler responseQueuesSizeThrottler; + + // RESPONSE_QUEUE_MAX_SIZE limits total size of responses in every response queue + private static final long DEFAULT_RESPONSE_QUEUES_MAX_SIZE = 1024 * 1024 * 1024; // 1G + private static final String RESPONSE_QUEUES_MAX_SIZE = "ipc.server.response.queue.maxsize"; + volatile protected boolean running = true; // true while server runs protected BlockingQueue callQueue; // queued calls protected final Counter callQueueSize = new Counter(); @@ -987,7 +996,7 @@ // // Extract the first call // - call = responseQueue.removeFirst(); + call = responseQueue.peek(); SocketChannel channel = call.connection.channel; if (LOG.isDebugEnabled()) { LOG.debug(getName() + ": responding to #" + call.id + " from " + @@ -998,9 +1007,13 @@ // int numBytes = channelWrite(channel, call.response); if (numBytes < 0) { + // Error flag is set, so returning here closes connection and + // clears responseQueue. return true; } if (!call.response.hasRemaining()) { + responseQueue.poll(); + responseQueuesSizeThrottler.decrease(call.response.limit()); responseQueueLen--; call.connection.decRpcCount(); //noinspection RedundantIfStatement @@ -1014,12 +1027,6 @@ call.connection + " Wrote " + numBytes + " bytes."); } } else { - // - // If we were unable to write the entire response out, then - // insert in Selector queue. - // - call.connection.responseQueue.addFirst(call); - if (inHandler) { // set the serve time when the response has to be sent later call.timestamp = System.currentTimeMillis(); @@ -1074,15 +1081,31 @@ responseQueueLen++; boolean doRegister = false; + boolean closed; + try { + responseQueuesSizeThrottler.increase(call.response.remaining()); + } catch (InterruptedException ie) { + throw new InterruptedIOException(ie.getMessage()); + } synchronized (call.connection.responseQueue) { - call.connection.responseQueue.addLast(call); - if (call.connection.responseQueue.size() == 1) { - doRegister = !processResponse(call.connection.responseQueue, false); + closed = call.connection.closed; + if (!closed) { + call.connection.responseQueue.addLast(call); + + if (call.connection.responseQueue.size() == 1) { + doRegister = !processResponse(call.connection.responseQueue, false); + } } } if (doRegister) { enqueueInSelector(call); } + if (closed) { + // Connection was closed when we tried to submit response, but we + // increased responseQueues size already. It shoud be + // decreased here. + responseQueuesSizeThrottler.decrease(call.response.remaining()); + } } private synchronized void incPending() { // call waiting to be enqueued. @@ -1107,6 +1130,8 @@ //version are read private boolean headerRead = false; //if the connection header that //follows version is read. + + protected volatile boolean closed = false; // indicates if connection was closed protected SocketChannel channel; private ByteBuffer data; private ByteBuffer dataLengthBuffer; @@ -1691,6 +1716,7 @@ } protected synchronized void close() { + closed = true; disposeSasl(); data = null; dataLengthBuffer = null; @@ -1946,6 +1972,9 @@ this.delayedCalls = new AtomicInteger(0); + this.responseQueuesSizeThrottler = new SizeBasedThrottler( + conf.getLong(RESPONSE_QUEUES_MAX_SIZE, DEFAULT_RESPONSE_QUEUES_MAX_SIZE)); + // Create the responder here responder = new Responder(); this.authorize = @@ -1990,6 +2019,14 @@ } } connection.close(); + long bytes = 0; + synchronized (connection.responseQueue) { + for (Call c : connection.responseQueue) { + bytes += c.response.limit(); + } + connection.responseQueue.clear(); + } + responseQueuesSizeThrottler.decrease(bytes); rpcMetrics.numOpenConnections.set(numConnections); } @@ -2244,4 +2281,8 @@ public static RpcCallContext getCurrentCall() { return CurCall.get(); } + + public long getResponseQueueSize(){ + return responseQueuesSizeThrottler.getCurrentValue(); + } }