diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java index 28c4c22..bd1e5fd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java @@ -58,12 +58,12 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; * constant-time {@link #cacheBlock} and {@link #getBlock} operations.

* * Contains three levels of block priority to allow for - * scan-resistance and in-memory families + * scan-resistance and in-memory families * {@link org.apache.hadoop.hbase.HColumnDescriptor#setInMemory(boolean)} (An * in-memory column family is a column family that should be served from memory if possible): * single-access, multiple-accesses, and in-memory priority. * A block is added with an in-memory priority flag if - * {@link org.apache.hadoop.hbase.HColumnDescriptor#isInMemory()}, + * {@link org.apache.hadoop.hbase.HColumnDescriptor#isInMemory()}, * otherwise a block becomes a single access * priority the first time it is read into this block cache. If a block is accessed again while * in cache, it is marked as a multiple access priority block. This delineation of blocks is used @@ -438,6 +438,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { @Override public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat, boolean updateCacheMetrics) { + updateCacheMetrics = false; LruCachedBlock cb = map.get(cacheKey); if (cb == null) { if (!repeat && updateCacheMetrics) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 02fe1df..5d95787 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -568,7 +568,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { responseBlockSize += blockSize; } - public synchronized void sendResponseIfReady() throws IOException { + public void sendResponseIfReady() throws IOException { this.responder.doRespond(this); } @@ -653,7 +653,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { private class Reader implements Runnable { - private volatile boolean adding = false; + // Changing adding only under lock. See how it is used below. + private boolean adding = false; private final Selector readSelector; Reader() throws IOException { @@ -672,14 +673,15 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } } - private synchronized void doRunLoop() { + private void doRunLoop() { while (running) { try { readSelector.select(); - while (adding) { - this.wait(1000); + synchronized (this) { + while (adding) { + this.wait(0, 1000); + } } - Iterator iter = readSelector.selectedKeys().iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); @@ -707,7 +709,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { * in while(adding) for finishAdd call */ public void startAdd() { - adding = true; + synchronized (this) { + adding = true; + } readSelector.wakeup(); } @@ -1306,6 +1310,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { if (socketSendBufferSize != 0) { try { socket.setSendBufferSize(socketSendBufferSize); + LOG.info("SOCKET BUFFER SIZE=" + socket.getSendBufferSize()); } catch (IOException e) { LOG.warn("Connection: unable to set socket send buffer size to " + socketSendBufferSize); @@ -2065,7 +2070,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { final InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler) throws IOException { - + if (conf.getBoolean("hbase.ipc.server.reservoir.enabled", true)) { this.reservoir = new BoundedByteBufferPool( conf.getInt("hbase.ipc.server.reservoir.max.buffer.size", 1024 * 1024), @@ -2083,7 +2088,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { this.services = services; this.bindAddress = bindAddress; this.conf = conf; - this.socketSendBufferSize = 0; + this.socketSendBufferSize = 256 * 1024 * 1024; this.maxQueueSize = this.conf.getInt("hbase.ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE); this.readThreads = conf.getInt("hbase.ipc.server.read.threadpool.size", 10); @@ -2275,11 +2280,11 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } long requestSize = param.getSerializedSize(); long responseSize = result.getSerializedSize(); - metrics.dequeuedCall(qTime); - metrics.processedCall(processingTime); - metrics.totalCall(totalTime); - metrics.receivedRequest(requestSize); - metrics.sentResponse(responseSize); + // metrics.dequeuedCall(qTime); + // metrics.processedCall(processingTime); + // metrics.totalCall(totalTime); + // metrics.receivedRequest(requestSize); + // metrics.sentResponse(responseSize); // log any RPC responses that are slower than the configured warn // response time or larger than configured warning size boolean tooSlow = (processingTime > warnResponseTime && warnResponseTime > -1); @@ -2301,7 +2306,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { if (e instanceof ServiceException) e = e.getCause(); // increment the number of requests that were exceptions. - metrics.exception(e); + // metrics.exception(e); if (e instanceof LinkageError) throw new DoNotRetryIOException(e); if (e instanceof IOException) throw (IOException)e; @@ -2453,7 +2458,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { protected long channelWrite(GatheringByteChannel channel, BufferChain bufferChain) throws IOException { long count = bufferChain.write(channel, NIO_BUFFER_LIMIT); - if (count > 0) this.metrics.sentBytes(count); + // if (count > 0) this.metrics.sentBytes(count); return count; } @@ -2474,9 +2479,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ? channel.read(buffer) : channelIO(channel, null, buffer); - if (count > 0) { - metrics.receivedBytes(count); - } + // if (count > 0) { + // metrics.receivedBytes(count); + // } return count; }