diff --git a/pom.xml b/pom.xml
index 6566a1c..da0cc45 100644
--- a/pom.xml
+++ b/pom.xml
@@ -887,7 +887,7 @@
2.4.0a
1.5.8
1.0.1
- 0.7.0
+ 0.8.0
3.4.2
0.0.1-SNAPSHOT
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java
index 996a289..5193299 100644
--- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java
@@ -20,8 +20,6 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.List;
@@ -34,24 +32,10 @@ import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.thrift.generated.Hbase;
import org.apache.hadoop.hbase.thrift.generated.IOError;
import org.apache.hadoop.hbase.thrift.generated.TRowResult;
-import org.apache.hadoop.hbase.thrift.TBoundedThreadPoolServer;
-import org.apache.hadoop.hbase.thrift.TBoundedThreadPoolServer.Args;
-import org.apache.hadoop.hbase.thrift.ThriftServer;
+import org.apache.hadoop.hbase.thrift.ThriftServerRunner;
import org.apache.hadoop.hbase.thrift.ThriftUtilities;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TCompactProtocol;
-import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.server.TNonblockingServer;
-import org.apache.thrift.server.TServer;
-import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TNonblockingServerSocket;
-import org.apache.thrift.transport.TNonblockingServerTransport;
-import org.apache.thrift.transport.TServerSocket;
-import org.apache.thrift.transport.TServerTransport;
-import org.apache.thrift.transport.TTransportFactory;
/**
* HRegionThriftServer - this class starts up a Thrift server in the same
@@ -68,43 +52,59 @@ public class HRegionThriftServer extends Thread {
public static final Log LOG = LogFactory.getLog(HRegionThriftServer.class);
public static final int DEFAULT_LISTEN_PORT = 9090;
- private HRegionServer rs;
- private Configuration conf;
-
- private int port;
- private boolean nonblocking;
- private String bindIpAddress;
- private String transport;
- private String protocol;
- volatile private TServer tserver;
-
- /**
- * Whether requests should be redirected to other RegionServers if the
- * specified region is not hosted by this RegionServer.
- */
- private boolean redirect;
+ private final HRegionServer rs;
+ private final ThriftServerRunner serverRunner;
/**
* Create an instance of the glue object that connects the
* RegionServer with the standard ThriftServer implementation
*/
- HRegionThriftServer(HRegionServer regionServer, Configuration conf) {
+ HRegionThriftServer(HRegionServer regionServer, Configuration conf)
+ throws IOException {
+ super("Region Thrift Server");
this.rs = regionServer;
- this.conf = conf;
+ this.serverRunner =
+ new ThriftServerRunner(conf, new HBaseHandlerRegion(conf));
}
/**
- * Inherit the Handler from the standard ThriftServer. This allows us
+ * Stop ThriftServer
+ */
+ void shutdown() {
+ serverRunner.shutdown();
+ }
+
+ @Override
+ public void run() {
+ serverRunner.run();
+ }
+
+ /**
+ * Inherit the Handler from the standard ThriftServerRunner. This allows us
* to use the default implementation for most calls. We override certain calls
* for performance reasons
*/
- private class HBaseHandlerRegion extends ThriftServer.HBaseHandler {
+ private class HBaseHandlerRegion extends ThriftServerRunner.HBaseHandler {
+
+ /**
+ * Whether requests should be redirected to other RegionServers if the
+ * specified region is not hosted by this RegionServer.
+ */
+ private boolean redirect;
HBaseHandlerRegion(final Configuration conf) throws IOException {
super(conf);
initialize(conf);
}
+ /**
+ * Read and initialize config parameters
+ */
+ private void initialize(Configuration conf) {
+ this.redirect = conf.getBoolean("hbase.regionserver.thrift.redirect",
+ false);
+ }
+
// TODO: Override more methods to short-circuit for performance
/**
@@ -153,91 +153,4 @@ public class HRegionThriftServer extends Thread {
}
}
}
-
- /**
- * Read and initialize config parameters
- */
- private void initialize(Configuration conf) {
- this.port = conf.getInt("hbase.regionserver.thrift.port",
- DEFAULT_LISTEN_PORT);
- this.bindIpAddress = conf.get("hbase.regionserver.thrift.ipaddress");
- this.protocol = conf.get("hbase.regionserver.thrift.protocol");
- this.transport = conf.get("hbase.regionserver.thrift.transport");
- this.nonblocking = conf.getBoolean("hbase.regionserver.thrift.nonblocking",
- false);
- this.redirect = conf.getBoolean("hbase.regionserver.thrift.redirect",
- false);
- }
-
- /**
- * Stop ThriftServer
- */
- void shutdown() {
- if (tserver != null) {
- tserver.stop();
- tserver = null;
- }
- }
-
- @Override
- public void run() {
- try {
- HBaseHandlerRegion handler = new HBaseHandlerRegion(this.conf);
- Hbase.Processor processor =
- new Hbase.Processor(handler);
-
- TProtocolFactory protocolFactory;
- if (this.protocol != null && this.protocol.equals("compact")) {
- protocolFactory = new TCompactProtocol.Factory();
- } else {
- protocolFactory = new TBinaryProtocol.Factory();
- }
-
- if (this.nonblocking) {
- TNonblockingServerTransport serverTransport =
- new TNonblockingServerSocket(this.port);
- TFramedTransport.Factory transportFactory =
- new TFramedTransport.Factory();
-
- TNonblockingServer.Args serverArgs =
- new TNonblockingServer.Args(serverTransport);
- serverArgs.processor(processor);
- serverArgs.transportFactory(transportFactory);
- serverArgs.protocolFactory(protocolFactory);
- LOG.info("starting HRegionServer Nonblocking Thrift server on " +
- this.port);
- LOG.info("HRegionServer Nonblocking Thrift server does not " +
- "support address binding.");
- tserver = new TNonblockingServer(serverArgs);
- } else {
- InetAddress listenAddress = null;
- if (this.bindIpAddress != null) {
- listenAddress = InetAddress.getByName(this.bindIpAddress);
- } else {
- listenAddress = InetAddress.getLocalHost();
- }
- TServerTransport serverTransport = new TServerSocket(
- new InetSocketAddress(listenAddress, port));
-
- TTransportFactory transportFactory;
- if (this.transport != null && this.transport.equals("framed")) {
- transportFactory = new TFramedTransport.Factory();
- } else {
- transportFactory = new TTransportFactory();
- }
-
- TBoundedThreadPoolServer.Args serverArgs =
- new TBoundedThreadPoolServer.Args(serverTransport, conf);
- serverArgs.processor(processor);
- serverArgs.protocolFactory(protocolFactory);
- serverArgs.transportFactory(transportFactory);
- LOG.info("starting HRegionServer ThreadPool Thrift server on " +
- listenAddress + ":" + this.port);
- tserver = new TBoundedThreadPoolServer(serverArgs);
- }
- tserver.serve();
- } catch (Exception e) {
- LOG.warn("Unable to start HRegionServerThrift interface.", e);
- }
- }
}
diff --git a/src/main/java/org/apache/hadoop/hbase/thrift/HbaseTThreadedSelectorServerArgs.java b/src/main/java/org/apache/hadoop/hbase/thrift/HbaseTThreadedSelectorServerArgs.java
new file mode 100644
index 0000000..cd26ab6
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/hbase/thrift/HbaseTThreadedSelectorServerArgs.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hbase.thrift;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.thrift.server.TThreadedSelectorServer;
+import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TThreadedSelectorServer.Args that reads hadoop configuration
+ */
+public class HbaseTThreadedSelectorServerArgs
+ extends TThreadedSelectorServer.Args {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TThreadedSelectorServer.class);
+
+ /**
+ * Number of selector threads for reading and writing socket
+ */
+ public static final String SELECTOR_THREADS_CONF_KEY =
+ "hbase.thrift.selector.threads";
+
+ /**
+ * Number fo threads for processing the thrift calls
+ */
+ public static final String WORKER_THREADS_CONF_KEY =
+ "hbase.thrift.worker.threads";
+
+ /**
+ * Time to wait for server to stop gracefully
+ */
+ public static final String STOP_TIMEOUT_CONF_KEY =
+ "hbase.thrift.stop.timeout.seconds";
+
+ /**
+ * Maximum number of accepted elements per selector
+ */
+ public static final String ACCEPT_QUEUE_SIZE_PER_THREAD_CONF_KEY =
+ "hbase.thrift.accept.queue.size.per.selector";
+
+ /**
+ * The strategy for handling new accepted connections.
+ */
+ public static final String ACCEPT_POLICY_CONF_KEY =
+ "hbase.thrift.accept.policy";
+
+ public HbaseTThreadedSelectorServerArgs(
+ TNonblockingServerTransport transport, Configuration conf) {
+ super(transport);
+ readConf(conf);
+ }
+
+ private void readConf(Configuration conf) {
+ int selectorThreads = conf.getInt(
+ SELECTOR_THREADS_CONF_KEY, getSelectorThreads());
+ int workerThreads = conf.getInt(
+ WORKER_THREADS_CONF_KEY, getWorkerThreads());
+ int stopTimeoutVal = conf.getInt(STOP_TIMEOUT_CONF_KEY, getStopTimeoutVal());
+ int acceptQueueSizePerThread = conf.getInt(
+ ACCEPT_QUEUE_SIZE_PER_THREAD_CONF_KEY, getAcceptQueueSizePerThread());
+ AcceptPolicy acceptPolicy = AcceptPolicy.valueOf(conf.get(
+ ACCEPT_POLICY_CONF_KEY, getAcceptPolicy().toString()).toUpperCase());
+
+ super.selectorThreads(selectorThreads)
+ .workerThreads(workerThreads)
+ .stopTimeoutVal(stopTimeoutVal)
+ .acceptQueueSizePerThread(acceptQueueSizePerThread)
+ .acceptPolicy(acceptPolicy);
+
+ LOG.info("Read configuration selectorThreads:" + selectorThreads +
+ " workerThreads:" + workerThreads +
+ " stopTimeoutVal:" + stopTimeoutVal + "sec" +
+ " acceptQueueSizePerThread:" + acceptQueueSizePerThread +
+ " acceptPolicy:" + acceptPolicy);
+ }
+}
diff --git a/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java b/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java
index 3fa5d41..0ed6061 100644
--- a/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java
+++ b/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java
@@ -18,89 +18,29 @@
package org.apache.hadoop.hbase.thrift;
-import static org.apache.hadoop.hbase.util.Bytes.getBytes;
-
-import java.io.IOException;
import java.net.InetAddress;
-import java.net.InetSocketAddress;
import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionGroup;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.filter.ParseFilter;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.util.Strings;
-import org.apache.hadoop.net.DNS;
-import org.apache.hadoop.hbase.filter.PrefixFilter;
-import org.apache.hadoop.hbase.filter.WhileMatchFilter;
-import org.apache.hadoop.hbase.thrift.generated.AlreadyExists;
-import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
-import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
-import org.apache.hadoop.hbase.thrift.generated.Hbase;
-import org.apache.hadoop.hbase.thrift.generated.Hbase.Iface;
-import org.apache.hadoop.hbase.thrift.generated.Hbase.Processor;
-import org.apache.hadoop.hbase.thrift.generated.IOError;
-import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
-import org.apache.hadoop.hbase.thrift.generated.Mutation;
-import org.apache.hadoop.hbase.thrift.generated.TCell;
-import org.apache.hadoop.hbase.thrift.generated.TRegionInfo;
-import org.apache.hadoop.hbase.thrift.generated.TRowResult;
-import org.apache.hadoop.hbase.thrift.generated.TScan;
-import org.apache.hadoop.hbase.util.Addressing;
-import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.thrift.ThriftServerRunner.ImplType;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.util.Shell.ExitCodeException;
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TCompactProtocol;
-import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.server.THsHaServer;
-import org.apache.thrift.server.TNonblockingServer;
-import org.apache.thrift.server.TServer;
-import org.apache.thrift.server.TServer.AbstractServerArgs;
-import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TNonblockingServerSocket;
-import org.apache.thrift.transport.TNonblockingServerTransport;
-import org.apache.thrift.transport.TServerSocket;
-import org.apache.thrift.transport.TServerTransport;
-import org.apache.thrift.transport.TTransportFactory;
-
-import com.google.common.base.Joiner;
/**
- * ThriftServer - this class starts up a Thrift server which implements the
- * Hbase API specified in the Hbase.thrift IDL file.
+ * ThriftServer- this class starts up a Thrift server which implements the
+ * Hbase API specified in the Hbase.thrift IDL file. The server runs in an
+ * independent process.
*/
public class ThriftServer {
@@ -119,968 +59,16 @@ public class ThriftServer {
private static final int DEFAULT_LISTEN_PORT = 9090;
private Configuration conf;
- TServer server;
-
- /** An enum of server implementation selections */
- enum ImplType {
- HS_HA("hsha", true, THsHaServer.class, false),
- NONBLOCKING("nonblocking", true, TNonblockingServer.class, false),
- THREAD_POOL("threadpool", false, TBoundedThreadPoolServer.class, true);
-
- public static final ImplType DEFAULT = THREAD_POOL;
-
- final String option;
- final boolean isAlwaysFramed;
- final Class extends TServer> serverClass;
- final boolean canSpecifyBindIP;
-
- ImplType(String option, boolean isAlwaysFramed,
- Class extends TServer> serverClass, boolean canSpecifyBindIP) {
- this.option = option;
- this.isAlwaysFramed = isAlwaysFramed;
- this.serverClass = serverClass;
- this.canSpecifyBindIP = canSpecifyBindIP;
- }
-
- /**
- * @return -option so we can get the list of options from
- * {@link #values()}
- */
- @Override
- public String toString() {
- return "-" + option;
- }
-
- String getDescription() {
- StringBuilder sb = new StringBuilder("Use the " +
- serverClass.getSimpleName());
- if (isAlwaysFramed) {
- sb.append(" This implies the framed transport.");
- }
- if (this == DEFAULT) {
- sb.append("This is the default.");
- }
- return sb.toString();
- }
-
- static OptionGroup createOptionGroup() {
- OptionGroup group = new OptionGroup();
- for (ImplType t : values()) {
- group.addOption(new Option(t.option, t.getDescription()));
- }
- return group;
- }
-
- static ImplType getServerImpl(CommandLine cmd) {
- ImplType chosenType = null;
- int numChosen = 0;
- for (ImplType t : values()) {
- if (cmd.hasOption(t.option)) {
- chosenType = t;
- ++numChosen;
- }
- }
- if (numChosen != 1) {
- throw new AssertionError("Exactly one option out of " +
- Arrays.toString(values()) + " has to be specified");
- }
- return chosenType;
- }
-
- public String simpleClassName() {
- return serverClass.getSimpleName();
- }
-
- public static List serversThatCannotSpecifyBindIP() {
- List l = new ArrayList();
- for (ImplType t : values()) {
- if (!t.canSpecifyBindIP) {
- l.add(t.simpleClassName());
- }
- }
- return l;
- }
-
- }
-
- /**
- * The HBaseHandler is a glue object that connects Thrift RPC calls to the
- * HBase client API primarily defined in the HBaseAdmin and HTable objects.
- */
- public static class HBaseHandler implements Hbase.Iface {
- protected Configuration conf;
- protected HBaseAdmin admin = null;
- protected final Log LOG = LogFactory.getLog(this.getClass().getName());
-
- // nextScannerId and scannerMap are used to manage scanner state
- protected int nextScannerId = 0;
- protected HashMap scannerMap = null;
-
- private static ThreadLocal