Index: src/main/java/org/apache/hadoop/hbase/client/HTable.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.DaemonThreadFactory; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Writables; @@ -85,7 +86,7 @@ new ThreadPoolExecutor(1, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue(), - new DaemonThreadFactory()); + new DaemonThreadFactory("htable-thread-")); static { ((ThreadPoolExecutor)multiPutThreadPool).allowCoreThreadTimeOut(true); } @@ -1158,30 +1159,6 @@ } } - static class DaemonThreadFactory implements ThreadFactory { - final ThreadGroup group; - int threadNumber = 1; - final String namePrefix; - - DaemonThreadFactory() { - SecurityManager s = System.getSecurityManager(); - group = (s != null)? s.getThreadGroup() : - Thread.currentThread().getThreadGroup(); - namePrefix = "hbase-table-thread-"; - } - - public Thread newThread(Runnable r) { - Thread t = new Thread(group, r, - namePrefix + (threadNumber++), - 0); - if (!t.isDaemon()) - t.setDaemon(true); - if (t.getPriority() != Thread.NORM_PRIORITY) - t.setPriority(Thread.NORM_PRIORITY); - return t; - } - } - /** * Enable or disable region cache prefetch for the table. It will be * applied for the given table's all HTable instances who share the same Index: src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java +++ src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java @@ -43,26 +43,29 @@ import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.channels.WritableByteChannel; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Random; import java.util.*; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.util.DaemonThreadFactory; import org.apache.hadoop.hbase.io.WritableWithSize; import org.apache.hadoop.hbase.io.hfile.Compression; import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm; +import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.TaskMonitor; +import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.ObjectWritable; import org.apache.hadoop.io.Writable; @@ -283,6 +286,9 @@ //two cleanup runs private int backlogLength = conf.getInt("ipc.server.listen.queue.size", 128); + /** The ipc deserialization thread pool */ + protected ThreadPoolExecutor deserializationThreadPool; + public Listener() throws IOException { address = new InetSocketAddress(bindAddress, port); // Create a new server socket and set to non blocking mode @@ -299,7 +305,34 @@ acceptChannel.register(selector, SelectionKey.OP_ACCEPT); this.setName("IPC Server listener on " + port); this.setDaemon(true); + + // initialize the ipc deserializationThreadPool thread pool + int deserializationPoolMaxSize = conf.getInt("ipc.server.deserialization.threadPool.maxSize", + Runtime.getRuntime().availableProcessors() + 1); + deserializationThreadPool = new ThreadPoolExecutor( + 1, // the core pool size + deserializationPoolMaxSize, // the max pool size + 60L, TimeUnit.SECONDS, // keep-alive time for each worker thread + new SynchronousQueue(), // direct handoffs + new DaemonThreadFactory("IPC-Deserialization"), + new RejectedExecutionHandler() { + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + try { + // The submission (listener) thread will be blocked until the thread pool frees up. + executor.getQueue().put(r); + } catch (InterruptedException e) { + throw new RejectedExecutionException( + "Failed to requeue the rejected request because of ", e); + } + } + }); + if (LOG.isDebugEnabled()) { + LOG.debug("Initialize the deserializationThreadPool with maxium " + + deserializationPoolMaxSize + " threads"); + } } + /** cleanup connections from connectionList. Choose a random range * to scan and also have a limit on the number of the connections * that will be cleanedup per run. The criteria for cleanup is the time @@ -365,10 +398,11 @@ iter.remove(); try { if (key.isValid()) { - if (key.isAcceptable()) + if (key.isAcceptable()) { doAccept(key); - else if (key.isReadable()) - doRead(key); + } else if (key.isReadable()) { + doAsyncRead(key); + } } } catch (IOException ignored) { } @@ -390,11 +424,6 @@ closeCurrentConnection(key); cleanupConnections(true); try { Thread.sleep(60000); } catch (Exception ignored) {} - } - } catch (InterruptedException e) { - if (running) { // unexpected -- log it - LOG.info(getName() + " caught: " + - StringUtils.stringifyException(e)); } } catch (Exception e) { closeCurrentConnection(key); @@ -419,6 +448,37 @@ } } + private void doAsyncRead(final SelectionKey readSelectionKey) { + unsetReadInterest(readSelectionKey); + + // submit the doRead request to the thread pool in order to deserialize the data in parallel + try { + deserializationThreadPool.submit(new Runnable() { + @Override + public void run() { + try { + doRead(readSelectionKey); + } catch (InterruptedException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Caught: " + StringUtils.stringifyException(e) + + " when processing " + readSelectionKey.attachment()); + } + } finally { + setReadInterest(readSelectionKey); + // wake up the selector from the blocking function select() + selector.wakeup(); + } + } + }); + } catch (Throwable e) { + setReadInterest(readSelectionKey); + if (LOG.isDebugEnabled()) { + LOG.debug("Caught " + e.getMessage() + " when processing the remote connection " + + readSelectionKey.attachment().toString()); + } + } + } + private void closeCurrentConnection(SelectionKey key) { if (key != null) { Connection c = (Connection)key.attachment(); @@ -438,7 +498,7 @@ Connection c; ServerSocketChannel server = (ServerSocketChannel) key.channel(); // accept up to 10 connections - for (int i=0; i<10; i++) { + for (int i = 0; i < 10; i++) { SocketChannel channel = server.accept(); if (channel==null) return; @@ -1319,4 +1379,24 @@ int nBytes = initialRemaining - buf.remaining(); return (nBytes > 0) ? nBytes : ret; } + + /** + * Set the OP_READ interest for the selection key + * @param selectionKey + */ + private static void setReadInterest(final SelectionKey selectionKey) { + synchronized (selectionKey) { + selectionKey.interestOps(selectionKey.interestOps() | SelectionKey.OP_READ); + } + } + + /** + * Unset the OP_READ interest for the selection key + * @param selectionKey + */ + private static void unsetReadInterest(final SelectionKey selectionKey) { + synchronized (selectionKey) { + selectionKey.interestOps(selectionKey.interestOps() & (~SelectionKey.OP_READ)); + } + } } Index: src/main/java/org/apache/hadoop/hbase/util/DaemonThreadFactory.java =================================================================== --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/util/DaemonThreadFactory.java @@ -0,0 +1,46 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.concurrent.ThreadFactory; + +public class DaemonThreadFactory implements ThreadFactory { + final ThreadGroup group; + int threadNumber = 1; + final String namePrefix; + + public DaemonThreadFactory(String namePrefix) { + SecurityManager s = System.getSecurityManager(); + group = (s != null)? s.getThreadGroup() : + Thread.currentThread().getThreadGroup(); + this.namePrefix = namePrefix; + } + + public Thread newThread(Runnable r) { + Thread t = new Thread(group, r, + namePrefix + (threadNumber++), + 0); + if (!t.isDaemon()) + t.setDaemon(true); + if (t.getPriority() != Thread.NORM_PRIORITY) + t.setPriority(Thread.NORM_PRIORITY); + return t; + } +}