diff --git a/pom.xml b/pom.xml index 6566a1c..da0cc45 100644 --- a/pom.xml +++ b/pom.xml @@ -887,7 +887,7 @@ 2.4.0a 1.5.8 1.0.1 - 0.7.0 + 0.8.0 3.4.2 0.0.1-SNAPSHOT diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java index 996a289..5193299 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java @@ -20,8 +20,6 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.List; @@ -34,24 +32,10 @@ import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.thrift.generated.Hbase; import org.apache.hadoop.hbase.thrift.generated.IOError; import org.apache.hadoop.hbase.thrift.generated.TRowResult; -import org.apache.hadoop.hbase.thrift.TBoundedThreadPoolServer; -import org.apache.hadoop.hbase.thrift.TBoundedThreadPoolServer.Args; -import org.apache.hadoop.hbase.thrift.ThriftServer; +import org.apache.hadoop.hbase.thrift.ThriftServerRunner; import org.apache.hadoop.hbase.thrift.ThriftUtilities; -import org.apache.thrift.protocol.TBinaryProtocol; -import org.apache.thrift.protocol.TCompactProtocol; -import org.apache.thrift.protocol.TProtocolFactory; -import org.apache.thrift.server.TNonblockingServer; -import org.apache.thrift.server.TServer; -import org.apache.thrift.transport.TFramedTransport; -import org.apache.thrift.transport.TNonblockingServerSocket; -import org.apache.thrift.transport.TNonblockingServerTransport; -import org.apache.thrift.transport.TServerSocket; -import org.apache.thrift.transport.TServerTransport; -import org.apache.thrift.transport.TTransportFactory; /** * HRegionThriftServer - this class starts up a Thrift server in the same @@ -68,43 +52,59 @@ public class HRegionThriftServer extends Thread { public static final Log LOG = LogFactory.getLog(HRegionThriftServer.class); public static final int DEFAULT_LISTEN_PORT = 9090; - private HRegionServer rs; - private Configuration conf; - - private int port; - private boolean nonblocking; - private String bindIpAddress; - private String transport; - private String protocol; - volatile private TServer tserver; - - /** - * Whether requests should be redirected to other RegionServers if the - * specified region is not hosted by this RegionServer. - */ - private boolean redirect; + private final HRegionServer rs; + private final ThriftServerRunner serverRunner; /** * Create an instance of the glue object that connects the * RegionServer with the standard ThriftServer implementation */ - HRegionThriftServer(HRegionServer regionServer, Configuration conf) { + HRegionThriftServer(HRegionServer regionServer, Configuration conf) + throws IOException { + super("Region Thrift Server"); this.rs = regionServer; - this.conf = conf; + this.serverRunner = + new ThriftServerRunner(conf, new HBaseHandlerRegion(conf)); } /** - * Inherit the Handler from the standard ThriftServer. This allows us + * Stop ThriftServer + */ + void shutdown() { + serverRunner.shutdown(); + } + + @Override + public void run() { + serverRunner.run(); + } + + /** + * Inherit the Handler from the standard ThriftServerRunner. This allows us * to use the default implementation for most calls. We override certain calls * for performance reasons */ - private class HBaseHandlerRegion extends ThriftServer.HBaseHandler { + private class HBaseHandlerRegion extends ThriftServerRunner.HBaseHandler { + + /** + * Whether requests should be redirected to other RegionServers if the + * specified region is not hosted by this RegionServer. + */ + private boolean redirect; HBaseHandlerRegion(final Configuration conf) throws IOException { super(conf); initialize(conf); } + /** + * Read and initialize config parameters + */ + private void initialize(Configuration conf) { + this.redirect = conf.getBoolean("hbase.regionserver.thrift.redirect", + false); + } + // TODO: Override more methods to short-circuit for performance /** @@ -153,91 +153,4 @@ public class HRegionThriftServer extends Thread { } } } - - /** - * Read and initialize config parameters - */ - private void initialize(Configuration conf) { - this.port = conf.getInt("hbase.regionserver.thrift.port", - DEFAULT_LISTEN_PORT); - this.bindIpAddress = conf.get("hbase.regionserver.thrift.ipaddress"); - this.protocol = conf.get("hbase.regionserver.thrift.protocol"); - this.transport = conf.get("hbase.regionserver.thrift.transport"); - this.nonblocking = conf.getBoolean("hbase.regionserver.thrift.nonblocking", - false); - this.redirect = conf.getBoolean("hbase.regionserver.thrift.redirect", - false); - } - - /** - * Stop ThriftServer - */ - void shutdown() { - if (tserver != null) { - tserver.stop(); - tserver = null; - } - } - - @Override - public void run() { - try { - HBaseHandlerRegion handler = new HBaseHandlerRegion(this.conf); - Hbase.Processor processor = - new Hbase.Processor(handler); - - TProtocolFactory protocolFactory; - if (this.protocol != null && this.protocol.equals("compact")) { - protocolFactory = new TCompactProtocol.Factory(); - } else { - protocolFactory = new TBinaryProtocol.Factory(); - } - - if (this.nonblocking) { - TNonblockingServerTransport serverTransport = - new TNonblockingServerSocket(this.port); - TFramedTransport.Factory transportFactory = - new TFramedTransport.Factory(); - - TNonblockingServer.Args serverArgs = - new TNonblockingServer.Args(serverTransport); - serverArgs.processor(processor); - serverArgs.transportFactory(transportFactory); - serverArgs.protocolFactory(protocolFactory); - LOG.info("starting HRegionServer Nonblocking Thrift server on " + - this.port); - LOG.info("HRegionServer Nonblocking Thrift server does not " + - "support address binding."); - tserver = new TNonblockingServer(serverArgs); - } else { - InetAddress listenAddress = null; - if (this.bindIpAddress != null) { - listenAddress = InetAddress.getByName(this.bindIpAddress); - } else { - listenAddress = InetAddress.getLocalHost(); - } - TServerTransport serverTransport = new TServerSocket( - new InetSocketAddress(listenAddress, port)); - - TTransportFactory transportFactory; - if (this.transport != null && this.transport.equals("framed")) { - transportFactory = new TFramedTransport.Factory(); - } else { - transportFactory = new TTransportFactory(); - } - - TBoundedThreadPoolServer.Args serverArgs = - new TBoundedThreadPoolServer.Args(serverTransport, conf); - serverArgs.processor(processor); - serverArgs.protocolFactory(protocolFactory); - serverArgs.transportFactory(transportFactory); - LOG.info("starting HRegionServer ThreadPool Thrift server on " + - listenAddress + ":" + this.port); - tserver = new TBoundedThreadPoolServer(serverArgs); - } - tserver.serve(); - } catch (Exception e) { - LOG.warn("Unable to start HRegionServerThrift interface.", e); - } - } } diff --git a/src/main/java/org/apache/hadoop/hbase/thrift/HbaseTThreadedSelectorServerArgs.java b/src/main/java/org/apache/hadoop/hbase/thrift/HbaseTThreadedSelectorServerArgs.java new file mode 100644 index 0000000..cd26ab6 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/thrift/HbaseTThreadedSelectorServerArgs.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hbase.thrift; + +import org.apache.hadoop.conf.Configuration; +import org.apache.thrift.server.TThreadedSelectorServer; +import org.apache.thrift.transport.TNonblockingServerTransport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * TThreadedSelectorServer.Args that reads hadoop configuration + */ +public class HbaseTThreadedSelectorServerArgs + extends TThreadedSelectorServer.Args { + + private static final Logger LOG = + LoggerFactory.getLogger(TThreadedSelectorServer.class); + + /** + * Number of selector threads for reading and writing socket + */ + public static final String SELECTOR_THREADS_CONF_KEY = + "hbase.thrift.selector.threads"; + + /** + * Number fo threads for processing the thrift calls + */ + public static final String WORKER_THREADS_CONF_KEY = + "hbase.thrift.worker.threads"; + + /** + * Time to wait for server to stop gracefully + */ + public static final String STOP_TIMEOUT_CONF_KEY = + "hbase.thrift.stop.timeout.seconds"; + + /** + * Maximum number of accepted elements per selector + */ + public static final String ACCEPT_QUEUE_SIZE_PER_THREAD_CONF_KEY = + "hbase.thrift.accept.queue.size.per.selector"; + + /** + * The strategy for handling new accepted connections. + */ + public static final String ACCEPT_POLICY_CONF_KEY = + "hbase.thrift.accept.policy"; + + public HbaseTThreadedSelectorServerArgs( + TNonblockingServerTransport transport, Configuration conf) { + super(transport); + readConf(conf); + } + + private void readConf(Configuration conf) { + int selectorThreads = conf.getInt( + SELECTOR_THREADS_CONF_KEY, getSelectorThreads()); + int workerThreads = conf.getInt( + WORKER_THREADS_CONF_KEY, getWorkerThreads()); + int stopTimeoutVal = conf.getInt(STOP_TIMEOUT_CONF_KEY, getStopTimeoutVal()); + int acceptQueueSizePerThread = conf.getInt( + ACCEPT_QUEUE_SIZE_PER_THREAD_CONF_KEY, getAcceptQueueSizePerThread()); + AcceptPolicy acceptPolicy = AcceptPolicy.valueOf(conf.get( + ACCEPT_POLICY_CONF_KEY, getAcceptPolicy().toString()).toUpperCase()); + + super.selectorThreads(selectorThreads) + .workerThreads(workerThreads) + .stopTimeoutVal(stopTimeoutVal) + .acceptQueueSizePerThread(acceptQueueSizePerThread) + .acceptPolicy(acceptPolicy); + + LOG.info("Read configuration selectorThreads:" + selectorThreads + + " workerThreads:" + workerThreads + + " stopTimeoutVal:" + stopTimeoutVal + "sec" + + " acceptQueueSizePerThread:" + acceptQueueSizePerThread + + " acceptPolicy:" + acceptPolicy); + } +} diff --git a/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java b/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java index 3fa5d41..0ed6061 100644 --- a/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java +++ b/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java @@ -18,89 +18,29 @@ package org.apache.hadoop.hbase.thrift; -import static org.apache.hadoop.hbase.util.Bytes.getBytes; - -import java.io.IOException; import java.net.InetAddress; -import java.net.InetSocketAddress; import java.net.UnknownHostException; -import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.TreeMap; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.OptionGroup; import org.apache.commons.cli.Options; import org.apache.commons.cli.PosixParser; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.filter.Filter; -import org.apache.hadoop.hbase.filter.ParseFilter; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.util.Strings; -import org.apache.hadoop.net.DNS; -import org.apache.hadoop.hbase.filter.PrefixFilter; -import org.apache.hadoop.hbase.filter.WhileMatchFilter; -import org.apache.hadoop.hbase.thrift.generated.AlreadyExists; -import org.apache.hadoop.hbase.thrift.generated.BatchMutation; -import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor; -import org.apache.hadoop.hbase.thrift.generated.Hbase; -import org.apache.hadoop.hbase.thrift.generated.Hbase.Iface; -import org.apache.hadoop.hbase.thrift.generated.Hbase.Processor; -import org.apache.hadoop.hbase.thrift.generated.IOError; -import org.apache.hadoop.hbase.thrift.generated.IllegalArgument; -import org.apache.hadoop.hbase.thrift.generated.Mutation; -import org.apache.hadoop.hbase.thrift.generated.TCell; -import org.apache.hadoop.hbase.thrift.generated.TRegionInfo; -import org.apache.hadoop.hbase.thrift.generated.TRowResult; -import org.apache.hadoop.hbase.thrift.generated.TScan; -import org.apache.hadoop.hbase.util.Addressing; -import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.thrift.ThriftServerRunner.ImplType; import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.util.Shell.ExitCodeException; -import org.apache.thrift.TException; -import org.apache.thrift.protocol.TBinaryProtocol; -import org.apache.thrift.protocol.TCompactProtocol; -import org.apache.thrift.protocol.TProtocolFactory; -import org.apache.thrift.server.THsHaServer; -import org.apache.thrift.server.TNonblockingServer; -import org.apache.thrift.server.TServer; -import org.apache.thrift.server.TServer.AbstractServerArgs; -import org.apache.thrift.transport.TFramedTransport; -import org.apache.thrift.transport.TNonblockingServerSocket; -import org.apache.thrift.transport.TNonblockingServerTransport; -import org.apache.thrift.transport.TServerSocket; -import org.apache.thrift.transport.TServerTransport; -import org.apache.thrift.transport.TTransportFactory; - -import com.google.common.base.Joiner; /** - * ThriftServer - this class starts up a Thrift server which implements the - * Hbase API specified in the Hbase.thrift IDL file. + * ThriftServer- this class starts up a Thrift server which implements the + * Hbase API specified in the Hbase.thrift IDL file. The server runs in an + * independent process. */ public class ThriftServer { @@ -119,968 +59,16 @@ public class ThriftServer { private static final int DEFAULT_LISTEN_PORT = 9090; private Configuration conf; - TServer server; - - /** An enum of server implementation selections */ - enum ImplType { - HS_HA("hsha", true, THsHaServer.class, false), - NONBLOCKING("nonblocking", true, TNonblockingServer.class, false), - THREAD_POOL("threadpool", false, TBoundedThreadPoolServer.class, true); - - public static final ImplType DEFAULT = THREAD_POOL; - - final String option; - final boolean isAlwaysFramed; - final Class serverClass; - final boolean canSpecifyBindIP; - - ImplType(String option, boolean isAlwaysFramed, - Class serverClass, boolean canSpecifyBindIP) { - this.option = option; - this.isAlwaysFramed = isAlwaysFramed; - this.serverClass = serverClass; - this.canSpecifyBindIP = canSpecifyBindIP; - } - - /** - * @return -option so we can get the list of options from - * {@link #values()} - */ - @Override - public String toString() { - return "-" + option; - } - - String getDescription() { - StringBuilder sb = new StringBuilder("Use the " + - serverClass.getSimpleName()); - if (isAlwaysFramed) { - sb.append(" This implies the framed transport."); - } - if (this == DEFAULT) { - sb.append("This is the default."); - } - return sb.toString(); - } - - static OptionGroup createOptionGroup() { - OptionGroup group = new OptionGroup(); - for (ImplType t : values()) { - group.addOption(new Option(t.option, t.getDescription())); - } - return group; - } - - static ImplType getServerImpl(CommandLine cmd) { - ImplType chosenType = null; - int numChosen = 0; - for (ImplType t : values()) { - if (cmd.hasOption(t.option)) { - chosenType = t; - ++numChosen; - } - } - if (numChosen != 1) { - throw new AssertionError("Exactly one option out of " + - Arrays.toString(values()) + " has to be specified"); - } - return chosenType; - } - - public String simpleClassName() { - return serverClass.getSimpleName(); - } - - public static List serversThatCannotSpecifyBindIP() { - List l = new ArrayList(); - for (ImplType t : values()) { - if (!t.canSpecifyBindIP) { - l.add(t.simpleClassName()); - } - } - return l; - } - - } - - /** - * The HBaseHandler is a glue object that connects Thrift RPC calls to the - * HBase client API primarily defined in the HBaseAdmin and HTable objects. - */ - public static class HBaseHandler implements Hbase.Iface { - protected Configuration conf; - protected HBaseAdmin admin = null; - protected final Log LOG = LogFactory.getLog(this.getClass().getName()); - - // nextScannerId and scannerMap are used to manage scanner state - protected int nextScannerId = 0; - protected HashMap scannerMap = null; - - private static ThreadLocal> threadLocalTables = new ThreadLocal>() { - @Override - protected Map initialValue() { - return new TreeMap(); - } - }; - - /** - * Returns a list of all the column families for a given htable. - * - * @param table - * @return - * @throws IOException - */ - byte[][] getAllColumns(HTable table) throws IOException { - HColumnDescriptor[] cds = table.getTableDescriptor().getColumnFamilies(); - byte[][] columns = new byte[cds.length][]; - for (int i = 0; i < cds.length; i++) { - columns[i] = Bytes.add(cds[i].getName(), - KeyValue.COLUMN_FAMILY_DELIM_ARRAY); - } - return columns; - } - - /** - * Creates and returns an HTable instance from a given table name. - * - * @param tableName - * name of table - * @return HTable object - * @throws IOException - * @throws IOError - */ - protected HTable getTable(final byte[] tableName) throws - IOException { - String table = new String(tableName); - Map tables = threadLocalTables.get(); - if (!tables.containsKey(table)) { - tables.put(table, new HTable(conf, tableName)); - } - return tables.get(table); - } - - protected HTable getTable(final ByteBuffer tableName) throws IOException { - return getTable(getBytes(tableName)); - } - - /** - * Assigns a unique ID to the scanner and adds the mapping to an internal - * hash-map. - * - * @param scanner - * @return integer scanner id - */ - protected synchronized int addScanner(ResultScanner scanner) { - int id = nextScannerId++; - scannerMap.put(id, scanner); - return id; - } - - /** - * Returns the scanner associated with the specified ID. - * - * @param id - * @return a Scanner, or null if ID was invalid. - */ - protected synchronized ResultScanner getScanner(int id) { - return scannerMap.get(id); - } - - /** - * Removes the scanner associated with the specified ID from the internal - * id->scanner hash-map. - * - * @param id - * @return a Scanner, or null if ID was invalid. - */ - protected synchronized ResultScanner removeScanner(int id) { - return scannerMap.remove(id); - } - - /** - * Constructs an HBaseHandler object. - * @throws IOException - */ - protected HBaseHandler() - throws IOException { - this(HBaseConfiguration.create()); - } - - protected HBaseHandler(final Configuration c) - throws IOException { - this.conf = c; - admin = new HBaseAdmin(conf); - scannerMap = new HashMap(); - } - - @Override - public void enableTable(ByteBuffer tableName) throws IOError { - try{ - admin.enableTable(getBytes(tableName)); - } catch (IOException e) { - throw new IOError(e.getMessage()); - } - } - - @Override - public void disableTable(ByteBuffer tableName) throws IOError{ - try{ - admin.disableTable(getBytes(tableName)); - } catch (IOException e) { - throw new IOError(e.getMessage()); - } - } - - @Override - public boolean isTableEnabled(ByteBuffer tableName) throws IOError { - try { - return HTable.isTableEnabled(this.conf, getBytes(tableName)); - } catch (IOException e) { - throw new IOError(e.getMessage()); - } - } - - @Override - public void compact(ByteBuffer tableNameOrRegionName) throws IOError { - try{ - admin.compact(getBytes(tableNameOrRegionName)); - } catch (InterruptedException e) { - throw new IOError(e.getMessage()); - } catch (IOException e) { - throw new IOError(e.getMessage()); - } - } - - @Override - public void majorCompact(ByteBuffer tableNameOrRegionName) throws IOError { - try{ - admin.majorCompact(getBytes(tableNameOrRegionName)); - } catch (InterruptedException e) { - throw new IOError(e.getMessage()); - } catch (IOException e) { - throw new IOError(e.getMessage()); - } - } - - @Override - public List getTableNames() throws IOError { - try { - HTableDescriptor[] tables = this.admin.listTables(); - ArrayList list = new ArrayList(tables.length); - for (int i = 0; i < tables.length; i++) { - list.add(ByteBuffer.wrap(tables[i].getName())); - } - return list; - } catch (IOException e) { - throw new IOError(e.getMessage()); - } - } - - @Override - public List getTableRegions(ByteBuffer tableName) - throws IOError { - try{ - List hris = this.admin.getTableRegions(tableName.array()); - List regions = new ArrayList(); - - if (hris != null) { - for (HRegionInfo regionInfo : hris){ - TRegionInfo region = new TRegionInfo(); - region.startKey = ByteBuffer.wrap(regionInfo.getStartKey()); - region.endKey = ByteBuffer.wrap(regionInfo.getEndKey()); - region.id = regionInfo.getRegionId(); - region.name = ByteBuffer.wrap(regionInfo.getRegionName()); - region.version = regionInfo.getVersion(); - regions.add(region); - } - } - return regions; - } catch (IOException e){ - throw new IOError(e.getMessage()); - } - } - - @Deprecated - @Override - public List get(ByteBuffer tableName, ByteBuffer row, ByteBuffer column) - throws IOError { - byte [][] famAndQf = KeyValue.parseColumn(getBytes(column)); - if(famAndQf.length == 1) { - return get(tableName, row, famAndQf[0], new byte[0]); - } - return get(tableName, row, famAndQf[0], famAndQf[1]); - } - - protected List get(ByteBuffer tableName, - ByteBuffer row, - byte[] family, - byte[] qualifier) throws IOError { - try { - HTable table = getTable(tableName); - Get get = new Get(getBytes(row)); - if (qualifier == null || qualifier.length == 0) { - get.addFamily(family); - } else { - get.addColumn(family, qualifier); - } - Result result = table.get(get); - return ThriftUtilities.cellFromHBase(result.raw()); - } catch (IOException e) { - throw new IOError(e.getMessage()); - } - } - - @Deprecated - @Override - public List getVer(ByteBuffer tableName, ByteBuffer row, - ByteBuffer column, int numVersions) throws IOError { - byte [][] famAndQf = KeyValue.parseColumn(getBytes(column)); - if(famAndQf.length == 1) { - return getVer(tableName, row, famAndQf[0], - new byte[0], numVersions); - } - return getVer(tableName, row, - famAndQf[0], famAndQf[1], numVersions); - } - - public List getVer(ByteBuffer tableName, ByteBuffer row, - byte[] family, - byte[] qualifier, int numVersions) throws IOError { - try { - HTable table = getTable(tableName); - Get get = new Get(getBytes(row)); - get.addColumn(family, qualifier); - get.setMaxVersions(numVersions); - Result result = table.get(get); - return ThriftUtilities.cellFromHBase(result.raw()); - } catch (IOException e) { - throw new IOError(e.getMessage()); - } - } - - @Deprecated - @Override - public List getVerTs(ByteBuffer tableName, - ByteBuffer row, - ByteBuffer column, - long timestamp, - int numVersions) throws IOError { - byte [][] famAndQf = KeyValue.parseColumn(getBytes(column)); - if(famAndQf.length == 1) { - return getVerTs(tableName, row, famAndQf[0], new byte[0], timestamp, - numVersions); - } - return getVerTs(tableName, row, famAndQf[0], famAndQf[1], timestamp, - numVersions); - } - - protected List getVerTs(ByteBuffer tableName, - ByteBuffer row, byte [] family, - byte [] qualifier, long timestamp, int numVersions) throws IOError { - try { - HTable table = getTable(tableName); - Get get = new Get(getBytes(row)); - get.addColumn(family, qualifier); - get.setTimeRange(Long.MIN_VALUE, timestamp); - get.setMaxVersions(numVersions); - Result result = table.get(get); - return ThriftUtilities.cellFromHBase(result.raw()); - } catch (IOException e) { - throw new IOError(e.getMessage()); - } - } - - @Override - public List getRow(ByteBuffer tableName, ByteBuffer row) - throws IOError { - return getRowWithColumnsTs(tableName, row, null, - HConstants.LATEST_TIMESTAMP); - } - - @Override - public List getRowWithColumns(ByteBuffer tableName, - ByteBuffer row, - List columns) throws IOError { - return getRowWithColumnsTs(tableName, row, columns, - HConstants.LATEST_TIMESTAMP); - } - - @Override - public List getRowTs(ByteBuffer tableName, ByteBuffer row, - long timestamp) throws IOError { - return getRowWithColumnsTs(tableName, row, null, - timestamp); - } - - @Override - public List getRowWithColumnsTs(ByteBuffer tableName, ByteBuffer row, - List columns, long timestamp) throws IOError { - try { - HTable table = getTable(tableName); - if (columns == null) { - Get get = new Get(getBytes(row)); - get.setTimeRange(Long.MIN_VALUE, timestamp); - Result result = table.get(get); - return ThriftUtilities.rowResultFromHBase(result); - } - Get get = new Get(getBytes(row)); - for(ByteBuffer column : columns) { - byte [][] famAndQf = KeyValue.parseColumn(getBytes(column)); - if (famAndQf.length == 1) { - get.addFamily(famAndQf[0]); - } else { - get.addColumn(famAndQf[0], famAndQf[1]); - } - } - get.setTimeRange(Long.MIN_VALUE, timestamp); - Result result = table.get(get); - return ThriftUtilities.rowResultFromHBase(result); - } catch (IOException e) { - throw new IOError(e.getMessage()); - } - } - - @Override - public List getRows(ByteBuffer tableName, - List rows) - throws IOError { - return getRowsWithColumnsTs(tableName, rows, null, - HConstants.LATEST_TIMESTAMP); - } - - @Override - public List getRowsWithColumns(ByteBuffer tableName, - List rows, - List columns) throws IOError { - return getRowsWithColumnsTs(tableName, rows, columns, - HConstants.LATEST_TIMESTAMP); - } - - @Override - public List getRowsTs(ByteBuffer tableName, - List rows, - long timestamp) throws IOError { - return getRowsWithColumnsTs(tableName, rows, null, - timestamp); - } - - @Override - public List getRowsWithColumnsTs(ByteBuffer tableName, - List rows, - List columns, long timestamp) throws IOError { - try { - List gets = new ArrayList(rows.size()); - HTable table = getTable(tableName); - for (ByteBuffer row : rows) { - Get get = new Get(getBytes(row)); - if (columns != null) { - - for(ByteBuffer column : columns) { - byte [][] famAndQf = KeyValue.parseColumn(getBytes(column)); - if (famAndQf.length == 1) { - get.addFamily(famAndQf[0]); - } else { - get.addColumn(famAndQf[0], famAndQf[1]); - } - } - get.setTimeRange(Long.MIN_VALUE, timestamp); - } - gets.add(get); - } - Result[] result = table.get(gets); - return ThriftUtilities.rowResultFromHBase(result); - } catch (IOException e) { - throw new IOError(e.getMessage()); - } - } - - @Override - public void deleteAll(ByteBuffer tableName, ByteBuffer row, ByteBuffer column) - throws IOError { - deleteAllTs(tableName, row, column, HConstants.LATEST_TIMESTAMP); - } - - @Override - public void deleteAllTs(ByteBuffer tableName, - ByteBuffer row, - ByteBuffer column, - long timestamp) throws IOError { - try { - HTable table = getTable(tableName); - Delete delete = new Delete(getBytes(row)); - byte [][] famAndQf = KeyValue.parseColumn(getBytes(column)); - if (famAndQf.length == 1) { - delete.deleteFamily(famAndQf[0], timestamp); - } else { - delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp); - } - table.delete(delete); + ThriftServerRunner serverRunner; - } catch (IOException e) { - throw new IOError(e.getMessage()); - } - } - - @Override - public void deleteAllRow(ByteBuffer tableName, ByteBuffer row) throws IOError { - deleteAllRowTs(tableName, row, HConstants.LATEST_TIMESTAMP); - } - - @Override - public void deleteAllRowTs(ByteBuffer tableName, ByteBuffer row, long timestamp) - throws IOError { - try { - HTable table = getTable(tableName); - Delete delete = new Delete(getBytes(row), timestamp, null); - table.delete(delete); - } catch (IOException e) { - throw new IOError(e.getMessage()); - } - } - - @Override - public void createTable(ByteBuffer in_tableName, - List columnFamilies) throws IOError, - IllegalArgument, AlreadyExists { - byte [] tableName = getBytes(in_tableName); - try { - if (admin.tableExists(tableName)) { - throw new AlreadyExists("table name already in use"); - } - HTableDescriptor desc = new HTableDescriptor(tableName); - for (ColumnDescriptor col : columnFamilies) { - HColumnDescriptor colDesc = ThriftUtilities.colDescFromThrift(col); - desc.addFamily(colDesc); - } - admin.createTable(desc); - } catch (IOException e) { - throw new IOError(e.getMessage()); - } catch (IllegalArgumentException e) { - throw new IllegalArgument(e.getMessage()); - } - } - - @Override - public void deleteTable(ByteBuffer in_tableName) throws IOError { - byte [] tableName = getBytes(in_tableName); - if (LOG.isDebugEnabled()) { - LOG.debug("deleteTable: table=" + Bytes.toString(tableName)); - } - try { - if (!admin.tableExists(tableName)) { - throw new IOError("table does not exist"); - } - admin.deleteTable(tableName); - } catch (IOException e) { - throw new IOError(e.getMessage()); - } - } - - @Override - public void mutateRow(ByteBuffer tableName, ByteBuffer row, - List mutations) throws IOError, IllegalArgument { - mutateRowTs(tableName, row, mutations, HConstants.LATEST_TIMESTAMP); - } - - @Override - public void mutateRowTs(ByteBuffer tableName, ByteBuffer row, - List mutations, long timestamp) throws IOError, IllegalArgument { - HTable table = null; - try { - table = getTable(tableName); - Put put = new Put(getBytes(row), timestamp, null); - - Delete delete = new Delete(getBytes(row)); - - // I apologize for all this mess :) - for (Mutation m : mutations) { - byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column)); - if (m.isDelete) { - if (famAndQf.length == 1) { - delete.deleteFamily(famAndQf[0], timestamp); - } else { - delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp); - } - } else { - if(famAndQf.length == 1) { - put.add(famAndQf[0], HConstants.EMPTY_BYTE_ARRAY, - m.value != null ? m.value.array() - : HConstants.EMPTY_BYTE_ARRAY); - } else { - put.add(famAndQf[0], famAndQf[1], - m.value != null ? m.value.array() - : HConstants.EMPTY_BYTE_ARRAY); - } - } - } - if (!delete.isEmpty()) - table.delete(delete); - if (!put.isEmpty()) - table.put(put); - } catch (IOException e) { - throw new IOError(e.getMessage()); - } catch (IllegalArgumentException e) { - throw new IllegalArgument(e.getMessage()); - } - } - - @Override - public void mutateRows(ByteBuffer tableName, List rowBatches) - throws IOError, IllegalArgument, TException { - mutateRowsTs(tableName, rowBatches, HConstants.LATEST_TIMESTAMP); - } - - @Override - public void mutateRowsTs(ByteBuffer tableName, List rowBatches, long timestamp) - throws IOError, IllegalArgument, TException { - List puts = new ArrayList(); - List deletes = new ArrayList(); - - for (BatchMutation batch : rowBatches) { - byte[] row = getBytes(batch.row); - List mutations = batch.mutations; - Delete delete = new Delete(row); - Put put = new Put(row, timestamp, null); - for (Mutation m : mutations) { - byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column)); - if (m.isDelete) { - // no qualifier, family only. - if (famAndQf.length == 1) { - delete.deleteFamily(famAndQf[0], timestamp); - } else { - delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp); - } - } else { - if(famAndQf.length == 1) { - put.add(famAndQf[0], HConstants.EMPTY_BYTE_ARRAY, - m.value != null ? m.value.array() - : HConstants.EMPTY_BYTE_ARRAY); - } else { - put.add(famAndQf[0], famAndQf[1], - m.value != null ? m.value.array() - : HConstants.EMPTY_BYTE_ARRAY); - } - } - } - if (!delete.isEmpty()) - deletes.add(delete); - if (!put.isEmpty()) - puts.add(put); - } - - HTable table = null; - try { - table = getTable(tableName); - if (!puts.isEmpty()) - table.put(puts); - for (Delete del : deletes) { - table.delete(del); - } - } catch (IOException e) { - throw new IOError(e.getMessage()); - } catch (IllegalArgumentException e) { - throw new IllegalArgument(e.getMessage()); - } - } - - @Deprecated - @Override - public long atomicIncrement(ByteBuffer tableName, ByteBuffer row, ByteBuffer column, - long amount) throws IOError, IllegalArgument, TException { - byte [][] famAndQf = KeyValue.parseColumn(getBytes(column)); - if(famAndQf.length == 1) { - return atomicIncrement(tableName, row, famAndQf[0], new byte[0], - amount); - } - return atomicIncrement(tableName, row, famAndQf[0], famAndQf[1], amount); - } - - protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row, byte [] family, - byte [] qualifier, long amount) - throws IOError, IllegalArgument, TException { - HTable table; - try { - table = getTable(tableName); - return table.incrementColumnValue(getBytes(row), family, qualifier, amount); - } catch (IOException e) { - throw new IOError(e.getMessage()); - } - } - - public void scannerClose(int id) throws IOError, IllegalArgument { - LOG.debug("scannerClose: id=" + id); - ResultScanner scanner = getScanner(id); - if (scanner == null) { - throw new IllegalArgument("scanner ID is invalid"); - } - scanner.close(); - removeScanner(id); - } - - @Override - public List scannerGetList(int id,int nbRows) throws IllegalArgument, IOError { - LOG.debug("scannerGetList: id=" + id); - ResultScanner scanner = getScanner(id); - if (null == scanner) { - throw new IllegalArgument("scanner ID is invalid"); - } - - Result [] results = null; - try { - results = scanner.next(nbRows); - if (null == results) { - return new ArrayList(); - } - } catch (IOException e) { - throw new IOError(e.getMessage()); - } - return ThriftUtilities.rowResultFromHBase(results); - } - - @Override - public List scannerGet(int id) throws IllegalArgument, IOError { - return scannerGetList(id,1); - } - - public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan) throws IOError { - try { - HTable table = getTable(tableName); - Scan scan = new Scan(); - if (tScan.isSetStartRow()) { - scan.setStartRow(tScan.getStartRow()); - } - if (tScan.isSetStopRow()) { - scan.setStopRow(tScan.getStopRow()); - } - if (tScan.isSetTimestamp()) { - scan.setTimeRange(Long.MIN_VALUE, tScan.getTimestamp()); - } - if (tScan.isSetCaching()) { - scan.setCaching(tScan.getCaching()); - } - if (tScan.isSetColumns() && tScan.getColumns().size() != 0) { - for(ByteBuffer column : tScan.getColumns()) { - byte [][] famQf = KeyValue.parseColumn(getBytes(column)); - if(famQf.length == 1) { - scan.addFamily(famQf[0]); - } else { - scan.addColumn(famQf[0], famQf[1]); - } - } - } - if (tScan.isSetFilterString()) { - ParseFilter parseFilter = new ParseFilter(); - scan.setFilter(parseFilter.parseFilterString(tScan.getFilterString())); - } - return addScanner(table.getScanner(scan)); - } catch (IOException e) { - throw new IOError(e.getMessage()); - } - } - - @Override - public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow, - List columns) throws IOError { - try { - HTable table = getTable(tableName); - Scan scan = new Scan(getBytes(startRow)); - if(columns != null && columns.size() != 0) { - for(ByteBuffer column : columns) { - byte [][] famQf = KeyValue.parseColumn(getBytes(column)); - if(famQf.length == 1) { - scan.addFamily(famQf[0]); - } else { - scan.addColumn(famQf[0], famQf[1]); - } - } - } - return addScanner(table.getScanner(scan)); - } catch (IOException e) { - throw new IOError(e.getMessage()); - } - } - - @Override - public int scannerOpenWithStop(ByteBuffer tableName, ByteBuffer startRow, - ByteBuffer stopRow, List columns) throws IOError, TException { - try { - HTable table = getTable(tableName); - Scan scan = new Scan(getBytes(startRow), getBytes(stopRow)); - if(columns != null && columns.size() != 0) { - for(ByteBuffer column : columns) { - byte [][] famQf = KeyValue.parseColumn(getBytes(column)); - if(famQf.length == 1) { - scan.addFamily(famQf[0]); - } else { - scan.addColumn(famQf[0], famQf[1]); - } - } - } - return addScanner(table.getScanner(scan)); - } catch (IOException e) { - throw new IOError(e.getMessage()); - } - } - - @Override - public int scannerOpenWithPrefix(ByteBuffer tableName, - ByteBuffer startAndPrefix, - List columns) - throws IOError, TException { - try { - HTable table = getTable(tableName); - Scan scan = new Scan(getBytes(startAndPrefix)); - Filter f = new WhileMatchFilter( - new PrefixFilter(getBytes(startAndPrefix))); - scan.setFilter(f); - if (columns != null && columns.size() != 0) { - for(ByteBuffer column : columns) { - byte [][] famQf = KeyValue.parseColumn(getBytes(column)); - if(famQf.length == 1) { - scan.addFamily(famQf[0]); - } else { - scan.addColumn(famQf[0], famQf[1]); - } - } - } - return addScanner(table.getScanner(scan)); - } catch (IOException e) { - throw new IOError(e.getMessage()); - } - } - - @Override - public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow, - List columns, long timestamp) throws IOError, TException { - try { - HTable table = getTable(tableName); - Scan scan = new Scan(getBytes(startRow)); - scan.setTimeRange(Long.MIN_VALUE, timestamp); - if (columns != null && columns.size() != 0) { - for (ByteBuffer column : columns) { - byte [][] famQf = KeyValue.parseColumn(getBytes(column)); - if(famQf.length == 1) { - scan.addFamily(famQf[0]); - } else { - scan.addColumn(famQf[0], famQf[1]); - } - } - } - return addScanner(table.getScanner(scan)); - } catch (IOException e) { - throw new IOError(e.getMessage()); - } - } - - @Override - public int scannerOpenWithStopTs(ByteBuffer tableName, ByteBuffer startRow, - ByteBuffer stopRow, List columns, long timestamp) - throws IOError, TException { - try { - HTable table = getTable(tableName); - Scan scan = new Scan(getBytes(startRow), getBytes(stopRow)); - scan.setTimeRange(Long.MIN_VALUE, timestamp); - if (columns != null && columns.size() != 0) { - for (ByteBuffer column : columns) { - byte [][] famQf = KeyValue.parseColumn(getBytes(column)); - if(famQf.length == 1) { - scan.addFamily(famQf[0]); - } else { - scan.addColumn(famQf[0], famQf[1]); - } - } - } - scan.setTimeRange(Long.MIN_VALUE, timestamp); - return addScanner(table.getScanner(scan)); - } catch (IOException e) { - throw new IOError(e.getMessage()); - } - } - - @Override - public Map getColumnDescriptors( - ByteBuffer tableName) throws IOError, TException { - try { - TreeMap columns = - new TreeMap(); - - HTable table = getTable(tableName); - HTableDescriptor desc = table.getTableDescriptor(); - - for (HColumnDescriptor e : desc.getFamilies()) { - ColumnDescriptor col = ThriftUtilities.colDescFromHbase(e); - columns.put(col.name, col); - } - return columns; - } catch (IOException e) { - throw new IOError(e.getMessage()); - } - } - - @Override - public List getRowOrBefore(ByteBuffer tableName, ByteBuffer row, - ByteBuffer family) throws IOError { - try { - HTable table = getTable(getBytes(tableName)); - Result result = table.getRowOrBefore(getBytes(row), getBytes(family)); - return ThriftUtilities.cellFromHBase(result.raw()); - } catch (IOException e) { - throw new IOError(e.getMessage()); - } - } - - @Override - public TRegionInfo getRegionInfo(ByteBuffer searchRow) throws IOError { - try { - HTable table = getTable(HConstants.META_TABLE_NAME); - Result startRowResult = table.getRowOrBefore( - searchRow.array(), HConstants.CATALOG_FAMILY); - - if (startRowResult == null) { - throw new IOException("Cannot find row in .META., row=" - + Bytes.toString(searchRow.array())); - } - - // find region start and end keys - byte[] value = startRowResult.getValue(HConstants.CATALOG_FAMILY, - HConstants.REGIONINFO_QUALIFIER); - if (value == null || value.length == 0) { - throw new IOException("HRegionInfo REGIONINFO was null or " + - " empty in Meta for row=" - + Bytes.toString(searchRow.array())); - } - HRegionInfo regionInfo = Writables.getHRegionInfo(value); - TRegionInfo region = new TRegionInfo(); - region.setStartKey(regionInfo.getStartKey()); - region.setEndKey(regionInfo.getEndKey()); - region.id = regionInfo.getRegionId(); - region.setName(regionInfo.getRegionName()); - region.version = regionInfo.getVersion(); - - // find region assignment to server - value = startRowResult.getValue(HConstants.CATALOG_FAMILY, - HConstants.SERVER_QUALIFIER); - if (value != null && value.length > 0) { - String hostAndPort = Bytes.toString(value); - region.setServerName(Bytes.toBytes( - Addressing.parseHostname(hostAndPort))); - region.port = Addressing.parsePort(hostAndPort); - } - return region; - } catch (IOException e) { - throw new IOError(e.getMessage()); - } - } - } + // + // Main program and support routines + // public ThriftServer(Configuration conf) { this.conf = HBaseConfiguration.create(conf); } - // - // Main program and support routines - // - private static void printUsageAndExit(Options options, int exitCode) throws ExitCodeException { HelpFormatter formatter = new HelpFormatter(); @@ -1092,11 +80,20 @@ public class ThriftServer { throw new ExitCodeException(exitCode, ""); } - /* + /** * Start up or shuts down the Thrift server, depending on the arguments. * @param args */ void doMain(final String[] args) throws Exception { + processOptions(args); + serverRunner = new ThriftServerRunner(conf); + serverRunner.run(); + } + + /** + * Parse the command line options to set parameters the conf. + */ + private void processOptions(final String[] args) throws Exception { Options options = new Options(); options.addOption("b", BIND_OPTION, true, "Address to bind " + "the Thrift server to. Not supported by the Nonblocking and " + @@ -1143,10 +140,10 @@ public class ThriftServer { } // Get port to bind to - int listenPort = 0; try { - listenPort = Integer.parseInt(cmd.getOptionValue(PORT_OPTION, + int listenPort = Integer.parseInt(cmd.getOptionValue(PORT_OPTION, String.valueOf(DEFAULT_LISTEN_PORT))); + conf.setInt(ThriftServerRunner.PORT_CONF_KEY, listenPort); } catch (NumberFormatException e) { LOG.error("Could not parse the value provided for the port option", e); printUsageAndExit(options, -1); @@ -1162,103 +159,14 @@ public class ThriftServer { optionToConf(cmd, KEEP_ALIVE_SEC_OPTION, conf, TBoundedThreadPoolServer.THREAD_KEEP_ALIVE_TIME_SEC_CONF_KEY); - // Construct correct ProtocolFactory - TProtocolFactory protocolFactory; - if (cmd.hasOption(COMPACT_OPTION)) { - LOG.debug("Using compact protocol"); - protocolFactory = new TCompactProtocol.Factory(); - } else { - LOG.debug("Using binary protocol"); - protocolFactory = new TBinaryProtocol.Factory(); - } - - HBaseHandler handler = new HBaseHandler(conf); - Hbase.Processor processor = - new Hbase.Processor(handler); - ImplType implType = ImplType.getServerImpl(cmd); - - // Construct correct TransportFactory - TTransportFactory transportFactory; - if (cmd.hasOption(FRAMED_OPTION) || implType.isAlwaysFramed) { - transportFactory = new TFramedTransport.Factory(); - LOG.debug("Using framed transport"); - } else { - transportFactory = new TTransportFactory(); - } - - if (cmd.hasOption(BIND_OPTION) && !implType.canSpecifyBindIP) { - LOG.error("Server types " + Joiner.on(", ").join( - ImplType.serversThatCannotSpecifyBindIP()) + " don't support IP " + - "address binding at the moment. See " + - "https://issues.apache.org/jira/browse/HBASE-2155 for details."); - printUsageAndExit(options, -1); - } - - if (implType == ImplType.HS_HA || implType == ImplType.NONBLOCKING) { - if (cmd.hasOption(BIND_OPTION)) { - throw new RuntimeException("-" + BIND_OPTION + " not supported with " + - implType); - } - - TNonblockingServerTransport serverTransport = - new TNonblockingServerSocket(listenPort); - - if (implType == ImplType.NONBLOCKING) { - TNonblockingServer.Args serverArgs = - new TNonblockingServer.Args(serverTransport); - setServerArgs(serverArgs, processor, transportFactory, - protocolFactory); - server = new TNonblockingServer(serverArgs); - } else { - THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport); - serverArgs.processor(processor); - serverArgs.transportFactory(transportFactory); - serverArgs.protocolFactory(protocolFactory); - server = new THsHaServer(serverArgs); - } - LOG.info("starting HBase " + implType.simpleClassName() + - " server on " + Integer.toString(listenPort)); - } else if (implType == ImplType.THREAD_POOL) { - // Thread pool server. Get the IP address to bind to. - InetAddress listenAddress = getBindAddress(options, cmd); - - TServerTransport serverTransport = new TServerSocket( - new InetSocketAddress(listenAddress, listenPort)); - - TBoundedThreadPoolServer.Args serverArgs = new TBoundedThreadPoolServer.Args( - serverTransport, conf); - setServerArgs(serverArgs, processor, transportFactory, protocolFactory); - LOG.info("starting " + ImplType.THREAD_POOL.simpleClassName() + " on " - + listenAddress + ":" + Integer.toString(listenPort) - + "; " + serverArgs); - server = new TBoundedThreadPoolServer(serverArgs); - } else { - throw new AssertionError("Unsupported Thrift server implementation: " + - implType.simpleClassName()); - } - - // A sanity check that we instantiated the right type of server. - if (server.getClass() != implType.serverClass) { - throw new AssertionError("Expected to create Thrift server class " + - implType.serverClass.getName() + " but got " + - server.getClass().getName()); - } - - // login the server principal (if using secure Hadoop) - Configuration conf = handler.conf; - if (User.isSecurityEnabled() && User.isHBaseSecurityEnabled(conf)) { - String machineName = Strings.domainNamePointerToHostName( - DNS.getDefaultHost(conf.get("hbase.thrift.dns.interface", "default"), - conf.get("hbase.thrift.dns.nameserver", "default"))); - User.login(conf, "hbase.thrift.keytab.file", "hbase.thrift.kerberos.principal", - machineName); + // Set general thrift server options + conf.setBoolean(ThriftServerRunner.COMPACT_CONF_KEY, cmd.hasOption(COMPACT_OPTION)); + conf.setBoolean(ThriftServerRunner.FRAMED_CONF_KEY, cmd.hasOption(FRAMED_OPTION)); + if (cmd.hasOption(BIND_OPTION)) { + conf.set(ThriftServerRunner.BIND_CONF_KEY, cmd.getOptionValue(BIND_OPTION)); } - server.serve(); - } - - public void stop() { - server.stop(); + ImplType.setServerImpl(cmd, conf); } private InetAddress getBindAddress(Options options, CommandLine cmd) @@ -1275,18 +183,16 @@ public class ThriftServer { return listenAddress; } - private static void setServerArgs(AbstractServerArgs serverArgs, - Processor processor, TTransportFactory transportFactory, - TProtocolFactory protocolFactory) { - serverArgs.processor(processor); - serverArgs.transportFactory(transportFactory); - serverArgs.protocolFactory(protocolFactory); + public void stop() { + serverRunner.shutdown(); } private static void optionToConf(CommandLine cmd, String option, Configuration conf, String destConfKey) { if (cmd.hasOption(option)) { - conf.set(destConfKey, cmd.getOptionValue(option)); + String value = cmd.getOptionValue(option); + LOG.info("Set configuration key:" + destConfKey + " value:" + value); + conf.set(destConfKey, value); } } diff --git a/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java b/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java new file mode 100644 index 0000000..d975c3a --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java @@ -0,0 +1,1227 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.thrift; + +import static org.apache.hadoop.hbase.util.Bytes.getBytes; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionGroup; +import org.apache.commons.cli.PosixParser; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.ParseFilter; +import org.apache.hadoop.hbase.filter.PrefixFilter; +import org.apache.hadoop.hbase.filter.WhileMatchFilter; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.thrift.generated.AlreadyExists; +import org.apache.hadoop.hbase.thrift.generated.BatchMutation; +import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor; +import org.apache.hadoop.hbase.thrift.generated.Hbase; +import org.apache.hadoop.hbase.thrift.generated.Hbase.Iface; +import org.apache.hadoop.hbase.thrift.generated.Hbase.Processor; +import org.apache.hadoop.hbase.thrift.generated.IllegalArgument; +import org.apache.hadoop.hbase.thrift.generated.IOError; +import org.apache.hadoop.hbase.thrift.generated.Mutation; +import org.apache.hadoop.hbase.thrift.generated.TCell; +import org.apache.hadoop.hbase.thrift.generated.TRegionInfo; +import org.apache.hadoop.hbase.thrift.generated.TRowResult; +import org.apache.hadoop.hbase.thrift.generated.TScan; +import org.apache.hadoop.hbase.util.Addressing; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Strings; +import org.apache.hadoop.hbase.util.VersionInfo; +import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.net.DNS; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TCompactProtocol; +import org.apache.thrift.protocol.TProtocolFactory; +import org.apache.thrift.server.THsHaServer; +import org.apache.thrift.server.TNonblockingServer; +import org.apache.thrift.server.TThreadedSelectorServer; +import org.apache.thrift.server.TServer; +import org.apache.thrift.server.TServer.AbstractServerArgs; +import org.apache.thrift.transport.TFramedTransport; +import org.apache.thrift.transport.TNonblockingServerSocket; +import org.apache.thrift.transport.TNonblockingServerTransport; +import org.apache.thrift.transport.TServerSocket; +import org.apache.thrift.transport.TServerTransport; +import org.apache.thrift.transport.TTransportFactory; + +import com.google.common.base.Joiner; + +/** + * ThriftServerRunner - this class starts up a Thrift server which implements the + * Hbase API specified in the Hbase.thrift IDL file. + */ +public class ThriftServerRunner implements Runnable { + + private static final Log LOG = LogFactory.getLog(ThriftServerRunner.class); + + static final String SERVER_TYPE_CONF_KEY = "hadoop.regionserver.thrift.server.type"; + static final String BIND_CONF_KEY = "hadoop.regionserver.thrift.ipaddress"; + static final String COMPACT_CONF_KEY = "hadoop.regionserver.thrift.compact"; + static final String FRAMED_CONF_KEY = "hadoop.regionserver.thrift.framed"; + static final String PORT_CONF_KEY = "hadoop.regionserver.thrift.port"; + + private static final String DEFAULT_BIND_ADDR = "0.0.0.0"; + private static final int DEFAULT_LISTEN_PORT = 9090; + + private Configuration conf; + volatile TServer tserver; + private final HBaseHandler handler; + + /** An enum of server implementation selections */ + enum ImplType { + HS_HA("hsha", true, THsHaServer.class, false), + NONBLOCKING("nonblocking", true, TNonblockingServer.class, false), + THREAD_POOL("threadpool", false, TBoundedThreadPoolServer.class, true), + THREADED_SELECTOR("threadedselector", true, TThreadedSelectorServer.class, false); + + public static final ImplType DEFAULT = THREAD_POOL; + + final String option; + final boolean isAlwaysFramed; + final Class serverClass; + final boolean canSpecifyBindIP; + + ImplType(String option, boolean isAlwaysFramed, + Class serverClass, boolean canSpecifyBindIP) { + this.option = option; + this.isAlwaysFramed = isAlwaysFramed; + this.serverClass = serverClass; + this.canSpecifyBindIP = canSpecifyBindIP; + } + + /** + * @return -option so we can get the list of options from + * {@link #values()} + */ + @Override + public String toString() { + return "-" + option; + } + + String getDescription() { + StringBuilder sb = new StringBuilder("Use the " + + serverClass.getSimpleName()); + if (isAlwaysFramed) { + sb.append(" This implies the framed transport."); + } + if (this == DEFAULT) { + sb.append("This is the default."); + } + return sb.toString(); + } + + static OptionGroup createOptionGroup() { + OptionGroup group = new OptionGroup(); + for (ImplType t : values()) { + group.addOption(new Option(t.option, t.getDescription())); + } + return group; + } + + static ImplType getServerImpl(Configuration conf) { + ImplType chosenType = null; + String confType = conf.get(SERVER_TYPE_CONF_KEY, THREAD_POOL.option); + int numChosen = 0; + for (ImplType t : values()) { + if (confType.equals(t.option)) { + return t; + } + } + assert(false); // Will never happen + return null; + } + + static void setServerImpl(CommandLine cmd, Configuration conf) { + ImplType chosenType = null; + int numChosen = 0; + for (ImplType t : values()) { + if (cmd.hasOption(t.option)) { + chosenType = t; + ++numChosen; + } + } + if (numChosen != 1) { + throw new AssertionError("Exactly one option out of " + + Arrays.toString(values()) + " has to be specified"); + } + LOG.info("Setting thrift server to " + chosenType.option); + conf.set(SERVER_TYPE_CONF_KEY, chosenType.option); + } + + public String simpleClassName() { + return serverClass.getSimpleName(); + } + + public static List serversThatCannotSpecifyBindIP() { + List l = new ArrayList(); + for (ImplType t : values()) { + if (!t.canSpecifyBindIP) { + l.add(t.simpleClassName()); + } + } + return l; + } + + } + + public ThriftServerRunner(Configuration conf) throws IOException { + this(conf, new ThriftServerRunner.HBaseHandler(conf)); + } + + public ThriftServerRunner(Configuration conf, HBaseHandler handler) { + this.conf = HBaseConfiguration.create(conf); + this.handler = handler; + } + + /* + * Runs the Thrift server + */ + @Override + public void run() { + try { + setupServer(); + tserver.serve(); + } catch (Exception e) { + LOG.fatal("Cannot run ThriftServer"); + // Crash the process if the ThriftServer is not running + System.exit(-1); + } + } + + public void shutdown() { + if (tserver != null) { + tserver.stop(); + } + } + + /** + * Setting up the thrift TServer + */ + private void setupServer() throws Exception { + // Get port to bind to + int listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT); + + // Construct correct ProtocolFactory + TProtocolFactory protocolFactory; + if (conf.getBoolean(COMPACT_CONF_KEY, false)) { + LOG.debug("Using compact protocol"); + protocolFactory = new TCompactProtocol.Factory(); + } else { + LOG.debug("Using binary protocol"); + protocolFactory = new TBinaryProtocol.Factory(); + } + + Hbase.Processor processor = + new Hbase.Processor(handler); + ImplType implType = ImplType.getServerImpl(conf); + + // Construct correct TransportFactory + TTransportFactory transportFactory; + if (conf.getBoolean(FRAMED_CONF_KEY, false) || implType.isAlwaysFramed) { + transportFactory = new TFramedTransport.Factory(); + LOG.debug("Using framed transport"); + } else { + transportFactory = new TTransportFactory(); + } + + if (conf.get(BIND_CONF_KEY) != null && !implType.canSpecifyBindIP) { + LOG.error("Server types " + Joiner.on(", ").join( + ImplType.serversThatCannotSpecifyBindIP()) + " don't support IP " + + "address binding at the moment. See " + + "https://issues.apache.org/jira/browse/HBASE-2155 for details."); + throw new RuntimeException( + "-" + BIND_CONF_KEY + " not supported with " + implType); + } + + if (implType == ImplType.HS_HA || implType == ImplType.NONBLOCKING || + implType == ImplType.THREADED_SELECTOR) { + + TNonblockingServerTransport serverTransport = + new TNonblockingServerSocket(listenPort); + + if (implType == ImplType.NONBLOCKING) { + TNonblockingServer.Args serverArgs = + new TNonblockingServer.Args(serverTransport); + setServerArgs(serverArgs, processor, transportFactory, protocolFactory); + tserver = new TNonblockingServer(serverArgs); + } else if (implType == ImplType.HS_HA) { + THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport); + setServerArgs(serverArgs, processor, transportFactory, protocolFactory); + tserver = new THsHaServer(serverArgs); + } else { // THREADED_SELECTOR + TThreadedSelectorServer.Args serverArgs = + new HbaseTThreadedSelectorServerArgs(serverTransport, conf); + setServerArgs(serverArgs, processor, transportFactory, protocolFactory); + tserver = new TThreadedSelectorServer(serverArgs); + } + LOG.info("starting HBase " + implType.simpleClassName() + + " server on " + Integer.toString(listenPort)); + } else if (implType == ImplType.THREAD_POOL) { + // Thread pool server. Get the IP address to bind to. + InetAddress listenAddress = getBindAddress(conf); + + TServerTransport serverTransport = new TServerSocket( + new InetSocketAddress(listenAddress, listenPort)); + + TBoundedThreadPoolServer.Args serverArgs = new TBoundedThreadPoolServer.Args( + serverTransport, conf); + setServerArgs(serverArgs, processor, transportFactory, protocolFactory); + LOG.info("starting " + ImplType.THREAD_POOL.simpleClassName() + " on " + + listenAddress + ":" + Integer.toString(listenPort) + + "; " + serverArgs); + tserver = new TBoundedThreadPoolServer(serverArgs); + } else { + throw new AssertionError("Unsupported Thrift server implementation: " + + implType.simpleClassName()); + } + + // A sanity check that we instantiated the right type of server. + if (tserver.getClass() != implType.serverClass) { + throw new AssertionError("Expected to create Thrift server class " + + implType.serverClass.getName() + " but got " + + tserver.getClass().getName()); + } + + // login the server principal (if using secure Hadoop) + Configuration conf = handler.conf; + if (User.isSecurityEnabled() && User.isHBaseSecurityEnabled(conf)) { + String machineName = Strings.domainNamePointerToHostName( + DNS.getDefaultHost(conf.get("hbase.thrift.dns.interface", "default"), + conf.get("hbase.thrift.dns.nameserver", "default"))); + User.login(conf, "hbase.thrift.keytab.file", "hbase.thrift.kerberos.principal", + machineName); + } + } + + private InetAddress getBindAddress(Configuration conf) throws IOException { + InetAddress listenAddress = null; + String bindAddressStr = conf.get(BIND_CONF_KEY, DEFAULT_BIND_ADDR); + return InetAddress.getByName(bindAddressStr); + } + + private static void setServerArgs(AbstractServerArgs serverArgs, + Processor processor, TTransportFactory transportFactory, + TProtocolFactory protocolFactory) { + serverArgs.processor(processor); + serverArgs.transportFactory(transportFactory); + serverArgs.protocolFactory(protocolFactory); + } + + /** + * The HBaseHandler is a glue object that connects Thrift RPC calls to the + * HBase client API primarily defined in the HBaseAdmin and HTable objects. + */ + public static class HBaseHandler implements Hbase.Iface { + protected Configuration conf; + protected HBaseAdmin admin = null; + protected final Log LOG = LogFactory.getLog(this.getClass().getName()); + + // nextScannerId and scannerMap are used to manage scanner state + protected int nextScannerId = 0; + protected HashMap scannerMap = null; + + private static ThreadLocal> threadLocalTables = new ThreadLocal>() { + @Override + protected Map initialValue() { + return new TreeMap(); + } + }; + + /** + * Returns a list of all the column families for a given htable. + * + * @param table + * @return + * @throws IOException + */ + byte[][] getAllColumns(HTable table) throws IOException { + HColumnDescriptor[] cds = table.getTableDescriptor().getColumnFamilies(); + byte[][] columns = new byte[cds.length][]; + for (int i = 0; i < cds.length; i++) { + columns[i] = Bytes.add(cds[i].getName(), + KeyValue.COLUMN_FAMILY_DELIM_ARRAY); + } + return columns; + } + + /** + * Creates and returns an HTable instance from a given table name. + * + * @param tableName + * name of table + * @return HTable object + * @throws IOException + * @throws IOError + */ + protected HTable getTable(final byte[] tableName) throws + IOException { + String table = new String(tableName); + Map tables = threadLocalTables.get(); + if (!tables.containsKey(table)) { + tables.put(table, new HTable(conf, tableName)); + } + return tables.get(table); + } + + protected HTable getTable(final ByteBuffer tableName) throws IOException { + return getTable(getBytes(tableName)); + } + + /** + * Assigns a unique ID to the scanner and adds the mapping to an internal + * hash-map. + * + * @param scanner + * @return integer scanner id + */ + protected synchronized int addScanner(ResultScanner scanner) { + int id = nextScannerId++; + scannerMap.put(id, scanner); + return id; + } + + /** + * Returns the scanner associated with the specified ID. + * + * @param id + * @return a Scanner, or null if ID was invalid. + */ + protected synchronized ResultScanner getScanner(int id) { + return scannerMap.get(id); + } + + /** + * Removes the scanner associated with the specified ID from the internal + * id->scanner hash-map. + * + * @param id + * @return a Scanner, or null if ID was invalid. + */ + protected synchronized ResultScanner removeScanner(int id) { + return scannerMap.remove(id); + } + + /** + * Constructs an HBaseHandler object. + * @throws IOException + */ + protected HBaseHandler() + throws IOException { + this(HBaseConfiguration.create()); + } + + protected HBaseHandler(final Configuration c) + throws IOException { + this.conf = c; + admin = new HBaseAdmin(conf); + scannerMap = new HashMap(); + } + + @Override + public void enableTable(ByteBuffer tableName) throws IOError { + try{ + admin.enableTable(getBytes(tableName)); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public void disableTable(ByteBuffer tableName) throws IOError{ + try{ + admin.disableTable(getBytes(tableName)); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public boolean isTableEnabled(ByteBuffer tableName) throws IOError { + try { + return HTable.isTableEnabled(this.conf, getBytes(tableName)); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public void compact(ByteBuffer tableNameOrRegionName) throws IOError { + try{ + admin.compact(getBytes(tableNameOrRegionName)); + } catch (InterruptedException e) { + throw new IOError(e.getMessage()); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public void majorCompact(ByteBuffer tableNameOrRegionName) throws IOError { + try{ + admin.majorCompact(getBytes(tableNameOrRegionName)); + } catch (InterruptedException e) { + throw new IOError(e.getMessage()); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public List getTableNames() throws IOError { + try { + HTableDescriptor[] tables = this.admin.listTables(); + ArrayList list = new ArrayList(tables.length); + for (int i = 0; i < tables.length; i++) { + list.add(ByteBuffer.wrap(tables[i].getName())); + } + return list; + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public List getTableRegions(ByteBuffer tableName) + throws IOError { + try{ + List hris = this.admin.getTableRegions(tableName.array()); + List regions = new ArrayList(); + + if (hris != null) { + for (HRegionInfo regionInfo : hris){ + TRegionInfo region = new TRegionInfo(); + region.startKey = ByteBuffer.wrap(regionInfo.getStartKey()); + region.endKey = ByteBuffer.wrap(regionInfo.getEndKey()); + region.id = regionInfo.getRegionId(); + region.name = ByteBuffer.wrap(regionInfo.getRegionName()); + region.version = regionInfo.getVersion(); + regions.add(region); + } + } + return regions; + } catch (IOException e){ + throw new IOError(e.getMessage()); + } + } + + @Deprecated + @Override + public List get(ByteBuffer tableName, ByteBuffer row, ByteBuffer column) + throws IOError { + byte [][] famAndQf = KeyValue.parseColumn(getBytes(column)); + if(famAndQf.length == 1) { + return get(tableName, row, famAndQf[0], new byte[0]); + } + return get(tableName, row, famAndQf[0], famAndQf[1]); + } + + protected List get(ByteBuffer tableName, + ByteBuffer row, + byte[] family, + byte[] qualifier) throws IOError { + try { + HTable table = getTable(tableName); + Get get = new Get(getBytes(row)); + if (qualifier == null || qualifier.length == 0) { + get.addFamily(family); + } else { + get.addColumn(family, qualifier); + } + Result result = table.get(get); + return ThriftUtilities.cellFromHBase(result.raw()); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Deprecated + @Override + public List getVer(ByteBuffer tableName, ByteBuffer row, + ByteBuffer column, int numVersions) throws IOError { + byte [][] famAndQf = KeyValue.parseColumn(getBytes(column)); + if(famAndQf.length == 1) { + return getVer(tableName, row, famAndQf[0], + new byte[0], numVersions); + } + return getVer(tableName, row, + famAndQf[0], famAndQf[1], numVersions); + } + + public List getVer(ByteBuffer tableName, ByteBuffer row, + byte[] family, + byte[] qualifier, int numVersions) throws IOError { + try { + HTable table = getTable(tableName); + Get get = new Get(getBytes(row)); + get.addColumn(family, qualifier); + get.setMaxVersions(numVersions); + Result result = table.get(get); + return ThriftUtilities.cellFromHBase(result.raw()); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Deprecated + @Override + public List getVerTs(ByteBuffer tableName, + ByteBuffer row, + ByteBuffer column, + long timestamp, + int numVersions) throws IOError { + byte [][] famAndQf = KeyValue.parseColumn(getBytes(column)); + if(famAndQf.length == 1) { + return getVerTs(tableName, row, famAndQf[0], new byte[0], timestamp, + numVersions); + } + return getVerTs(tableName, row, famAndQf[0], famAndQf[1], timestamp, + numVersions); + } + + protected List getVerTs(ByteBuffer tableName, + ByteBuffer row, byte [] family, + byte [] qualifier, long timestamp, int numVersions) throws IOError { + try { + HTable table = getTable(tableName); + Get get = new Get(getBytes(row)); + get.addColumn(family, qualifier); + get.setTimeRange(Long.MIN_VALUE, timestamp); + get.setMaxVersions(numVersions); + Result result = table.get(get); + return ThriftUtilities.cellFromHBase(result.raw()); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public List getRow(ByteBuffer tableName, ByteBuffer row) + throws IOError { + return getRowWithColumnsTs(tableName, row, null, + HConstants.LATEST_TIMESTAMP); + } + + @Override + public List getRowWithColumns(ByteBuffer tableName, + ByteBuffer row, + List columns) throws IOError { + return getRowWithColumnsTs(tableName, row, columns, + HConstants.LATEST_TIMESTAMP); + } + + @Override + public List getRowTs(ByteBuffer tableName, ByteBuffer row, + long timestamp) throws IOError { + return getRowWithColumnsTs(tableName, row, null, + timestamp); + } + + @Override + public List getRowWithColumnsTs(ByteBuffer tableName, ByteBuffer row, + List columns, long timestamp) throws IOError { + try { + HTable table = getTable(tableName); + if (columns == null) { + Get get = new Get(getBytes(row)); + get.setTimeRange(Long.MIN_VALUE, timestamp); + Result result = table.get(get); + return ThriftUtilities.rowResultFromHBase(result); + } + Get get = new Get(getBytes(row)); + for(ByteBuffer column : columns) { + byte [][] famAndQf = KeyValue.parseColumn(getBytes(column)); + if (famAndQf.length == 1) { + get.addFamily(famAndQf[0]); + } else { + get.addColumn(famAndQf[0], famAndQf[1]); + } + } + get.setTimeRange(Long.MIN_VALUE, timestamp); + Result result = table.get(get); + return ThriftUtilities.rowResultFromHBase(result); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public List getRows(ByteBuffer tableName, + List rows) + throws IOError { + return getRowsWithColumnsTs(tableName, rows, null, + HConstants.LATEST_TIMESTAMP); + } + + @Override + public List getRowsWithColumns(ByteBuffer tableName, + List rows, + List columns) throws IOError { + return getRowsWithColumnsTs(tableName, rows, columns, + HConstants.LATEST_TIMESTAMP); + } + + @Override + public List getRowsTs(ByteBuffer tableName, + List rows, + long timestamp) throws IOError { + return getRowsWithColumnsTs(tableName, rows, null, + timestamp); + } + + @Override + public List getRowsWithColumnsTs(ByteBuffer tableName, + List rows, + List columns, long timestamp) throws IOError { + try { + List gets = new ArrayList(rows.size()); + HTable table = getTable(tableName); + for (ByteBuffer row : rows) { + Get get = new Get(getBytes(row)); + if (columns != null) { + + for(ByteBuffer column : columns) { + byte [][] famAndQf = KeyValue.parseColumn(getBytes(column)); + if (famAndQf.length == 1) { + get.addFamily(famAndQf[0]); + } else { + get.addColumn(famAndQf[0], famAndQf[1]); + } + } + get.setTimeRange(Long.MIN_VALUE, timestamp); + } + gets.add(get); + } + Result[] result = table.get(gets); + return ThriftUtilities.rowResultFromHBase(result); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public void deleteAll(ByteBuffer tableName, ByteBuffer row, ByteBuffer column) + throws IOError { + deleteAllTs(tableName, row, column, HConstants.LATEST_TIMESTAMP); + } + + @Override + public void deleteAllTs(ByteBuffer tableName, + ByteBuffer row, + ByteBuffer column, + long timestamp) throws IOError { + try { + HTable table = getTable(tableName); + Delete delete = new Delete(getBytes(row)); + byte [][] famAndQf = KeyValue.parseColumn(getBytes(column)); + if (famAndQf.length == 1) { + delete.deleteFamily(famAndQf[0], timestamp); + } else { + delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp); + } + table.delete(delete); + + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public void deleteAllRow(ByteBuffer tableName, ByteBuffer row) throws IOError { + deleteAllRowTs(tableName, row, HConstants.LATEST_TIMESTAMP); + } + + @Override + public void deleteAllRowTs(ByteBuffer tableName, ByteBuffer row, long timestamp) + throws IOError { + try { + HTable table = getTable(tableName); + Delete delete = new Delete(getBytes(row), timestamp, null); + table.delete(delete); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public void createTable(ByteBuffer in_tableName, + List columnFamilies) throws IOError, + IllegalArgument, AlreadyExists { + byte [] tableName = getBytes(in_tableName); + try { + if (admin.tableExists(tableName)) { + throw new AlreadyExists("table name already in use"); + } + HTableDescriptor desc = new HTableDescriptor(tableName); + for (ColumnDescriptor col : columnFamilies) { + HColumnDescriptor colDesc = ThriftUtilities.colDescFromThrift(col); + desc.addFamily(colDesc); + } + admin.createTable(desc); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } catch (IllegalArgumentException e) { + throw new IllegalArgument(e.getMessage()); + } + } + + @Override + public void deleteTable(ByteBuffer in_tableName) throws IOError { + byte [] tableName = getBytes(in_tableName); + if (LOG.isDebugEnabled()) { + LOG.debug("deleteTable: table=" + Bytes.toString(tableName)); + } + try { + if (!admin.tableExists(tableName)) { + throw new IOError("table does not exist"); + } + admin.deleteTable(tableName); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public void mutateRow(ByteBuffer tableName, ByteBuffer row, + List mutations) throws IOError, IllegalArgument { + mutateRowTs(tableName, row, mutations, HConstants.LATEST_TIMESTAMP); + } + + @Override + public void mutateRowTs(ByteBuffer tableName, ByteBuffer row, + List mutations, long timestamp) throws IOError, IllegalArgument { + HTable table = null; + try { + table = getTable(tableName); + Put put = new Put(getBytes(row), timestamp, null); + + Delete delete = new Delete(getBytes(row)); + + // I apologize for all this mess :) + for (Mutation m : mutations) { + byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column)); + if (m.isDelete) { + if (famAndQf.length == 1) { + delete.deleteFamily(famAndQf[0], timestamp); + } else { + delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp); + } + } else { + if(famAndQf.length == 1) { + put.add(famAndQf[0], HConstants.EMPTY_BYTE_ARRAY, + m.value != null ? m.value.array() + : HConstants.EMPTY_BYTE_ARRAY); + } else { + put.add(famAndQf[0], famAndQf[1], + m.value != null ? m.value.array() + : HConstants.EMPTY_BYTE_ARRAY); + } + } + } + if (!delete.isEmpty()) + table.delete(delete); + if (!put.isEmpty()) + table.put(put); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } catch (IllegalArgumentException e) { + throw new IllegalArgument(e.getMessage()); + } + } + + @Override + public void mutateRows(ByteBuffer tableName, List rowBatches) + throws IOError, IllegalArgument, TException { + mutateRowsTs(tableName, rowBatches, HConstants.LATEST_TIMESTAMP); + } + + @Override + public void mutateRowsTs(ByteBuffer tableName, List rowBatches, long timestamp) + throws IOError, IllegalArgument, TException { + List puts = new ArrayList(); + List deletes = new ArrayList(); + + for (BatchMutation batch : rowBatches) { + byte[] row = getBytes(batch.row); + List mutations = batch.mutations; + Delete delete = new Delete(row); + Put put = new Put(row, timestamp, null); + for (Mutation m : mutations) { + byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column)); + if (m.isDelete) { + // no qualifier, family only. + if (famAndQf.length == 1) { + delete.deleteFamily(famAndQf[0], timestamp); + } else { + delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp); + } + } else { + if(famAndQf.length == 1) { + put.add(famAndQf[0], HConstants.EMPTY_BYTE_ARRAY, + m.value != null ? m.value.array() + : HConstants.EMPTY_BYTE_ARRAY); + } else { + put.add(famAndQf[0], famAndQf[1], + m.value != null ? m.value.array() + : HConstants.EMPTY_BYTE_ARRAY); + } + } + } + if (!delete.isEmpty()) + deletes.add(delete); + if (!put.isEmpty()) + puts.add(put); + } + + HTable table = null; + try { + table = getTable(tableName); + if (!puts.isEmpty()) + table.put(puts); + for (Delete del : deletes) { + table.delete(del); + } + } catch (IOException e) { + throw new IOError(e.getMessage()); + } catch (IllegalArgumentException e) { + throw new IllegalArgument(e.getMessage()); + } + } + + @Deprecated + @Override + public long atomicIncrement(ByteBuffer tableName, ByteBuffer row, ByteBuffer column, + long amount) throws IOError, IllegalArgument, TException { + byte [][] famAndQf = KeyValue.parseColumn(getBytes(column)); + if(famAndQf.length == 1) { + return atomicIncrement(tableName, row, famAndQf[0], new byte[0], + amount); + } + return atomicIncrement(tableName, row, famAndQf[0], famAndQf[1], amount); + } + + protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row, byte [] family, + byte [] qualifier, long amount) + throws IOError, IllegalArgument, TException { + HTable table; + try { + table = getTable(tableName); + return table.incrementColumnValue(getBytes(row), family, qualifier, amount); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + public void scannerClose(int id) throws IOError, IllegalArgument { + LOG.debug("scannerClose: id=" + id); + ResultScanner scanner = getScanner(id); + if (scanner == null) { + throw new IllegalArgument("scanner ID is invalid"); + } + scanner.close(); + removeScanner(id); + } + + @Override + public List scannerGetList(int id,int nbRows) throws IllegalArgument, IOError { + LOG.debug("scannerGetList: id=" + id); + ResultScanner scanner = getScanner(id); + if (null == scanner) { + throw new IllegalArgument("scanner ID is invalid"); + } + + Result [] results = null; + try { + results = scanner.next(nbRows); + if (null == results) { + return new ArrayList(); + } + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + return ThriftUtilities.rowResultFromHBase(results); + } + + @Override + public List scannerGet(int id) throws IllegalArgument, IOError { + return scannerGetList(id,1); + } + + public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan) throws IOError { + try { + HTable table = getTable(tableName); + Scan scan = new Scan(); + if (tScan.isSetStartRow()) { + scan.setStartRow(tScan.getStartRow()); + } + if (tScan.isSetStopRow()) { + scan.setStopRow(tScan.getStopRow()); + } + if (tScan.isSetTimestamp()) { + scan.setTimeRange(Long.MIN_VALUE, tScan.getTimestamp()); + } + if (tScan.isSetCaching()) { + scan.setCaching(tScan.getCaching()); + } + if (tScan.isSetColumns() && tScan.getColumns().size() != 0) { + for(ByteBuffer column : tScan.getColumns()) { + byte [][] famQf = KeyValue.parseColumn(getBytes(column)); + if(famQf.length == 1) { + scan.addFamily(famQf[0]); + } else { + scan.addColumn(famQf[0], famQf[1]); + } + } + } + if (tScan.isSetFilterString()) { + ParseFilter parseFilter = new ParseFilter(); + scan.setFilter(parseFilter.parseFilterString(tScan.getFilterString())); + } + return addScanner(table.getScanner(scan)); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow, + List columns) throws IOError { + try { + HTable table = getTable(tableName); + Scan scan = new Scan(getBytes(startRow)); + if(columns != null && columns.size() != 0) { + for(ByteBuffer column : columns) { + byte [][] famQf = KeyValue.parseColumn(getBytes(column)); + if(famQf.length == 1) { + scan.addFamily(famQf[0]); + } else { + scan.addColumn(famQf[0], famQf[1]); + } + } + } + return addScanner(table.getScanner(scan)); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public int scannerOpenWithStop(ByteBuffer tableName, ByteBuffer startRow, + ByteBuffer stopRow, List columns) throws IOError, TException { + try { + HTable table = getTable(tableName); + Scan scan = new Scan(getBytes(startRow), getBytes(stopRow)); + if(columns != null && columns.size() != 0) { + for(ByteBuffer column : columns) { + byte [][] famQf = KeyValue.parseColumn(getBytes(column)); + if(famQf.length == 1) { + scan.addFamily(famQf[0]); + } else { + scan.addColumn(famQf[0], famQf[1]); + } + } + } + return addScanner(table.getScanner(scan)); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public int scannerOpenWithPrefix(ByteBuffer tableName, + ByteBuffer startAndPrefix, + List columns) + throws IOError, TException { + try { + HTable table = getTable(tableName); + Scan scan = new Scan(getBytes(startAndPrefix)); + Filter f = new WhileMatchFilter( + new PrefixFilter(getBytes(startAndPrefix))); + scan.setFilter(f); + if (columns != null && columns.size() != 0) { + for(ByteBuffer column : columns) { + byte [][] famQf = KeyValue.parseColumn(getBytes(column)); + if(famQf.length == 1) { + scan.addFamily(famQf[0]); + } else { + scan.addColumn(famQf[0], famQf[1]); + } + } + } + return addScanner(table.getScanner(scan)); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow, + List columns, long timestamp) throws IOError, TException { + try { + HTable table = getTable(tableName); + Scan scan = new Scan(getBytes(startRow)); + scan.setTimeRange(Long.MIN_VALUE, timestamp); + if (columns != null && columns.size() != 0) { + for (ByteBuffer column : columns) { + byte [][] famQf = KeyValue.parseColumn(getBytes(column)); + if(famQf.length == 1) { + scan.addFamily(famQf[0]); + } else { + scan.addColumn(famQf[0], famQf[1]); + } + } + } + return addScanner(table.getScanner(scan)); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public int scannerOpenWithStopTs(ByteBuffer tableName, ByteBuffer startRow, + ByteBuffer stopRow, List columns, long timestamp) + throws IOError, TException { + try { + HTable table = getTable(tableName); + Scan scan = new Scan(getBytes(startRow), getBytes(stopRow)); + scan.setTimeRange(Long.MIN_VALUE, timestamp); + if (columns != null && columns.size() != 0) { + for (ByteBuffer column : columns) { + byte [][] famQf = KeyValue.parseColumn(getBytes(column)); + if(famQf.length == 1) { + scan.addFamily(famQf[0]); + } else { + scan.addColumn(famQf[0], famQf[1]); + } + } + } + scan.setTimeRange(Long.MIN_VALUE, timestamp); + return addScanner(table.getScanner(scan)); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public Map getColumnDescriptors( + ByteBuffer tableName) throws IOError, TException { + try { + TreeMap columns = + new TreeMap(); + + HTable table = getTable(tableName); + HTableDescriptor desc = table.getTableDescriptor(); + + for (HColumnDescriptor e : desc.getFamilies()) { + ColumnDescriptor col = ThriftUtilities.colDescFromHbase(e); + columns.put(col.name, col); + } + return columns; + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public List getRowOrBefore(ByteBuffer tableName, ByteBuffer row, + ByteBuffer family) throws IOError { + try { + HTable table = getTable(getBytes(tableName)); + Result result = table.getRowOrBefore(getBytes(row), getBytes(family)); + return ThriftUtilities.cellFromHBase(result.raw()); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public TRegionInfo getRegionInfo(ByteBuffer searchRow) throws IOError { + try { + HTable table = getTable(HConstants.META_TABLE_NAME); + Result startRowResult = table.getRowOrBefore( + searchRow.array(), HConstants.CATALOG_FAMILY); + + if (startRowResult == null) { + throw new IOException("Cannot find row in .META., row=" + + Bytes.toString(searchRow.array())); + } + + // find region start and end keys + byte[] value = startRowResult.getValue(HConstants.CATALOG_FAMILY, + HConstants.REGIONINFO_QUALIFIER); + if (value == null || value.length == 0) { + throw new IOException("HRegionInfo REGIONINFO was null or " + + " empty in Meta for row=" + + Bytes.toString(searchRow.array())); + } + HRegionInfo regionInfo = Writables.getHRegionInfo(value); + TRegionInfo region = new TRegionInfo(); + region.setStartKey(regionInfo.getStartKey()); + region.setEndKey(regionInfo.getEndKey()); + region.id = regionInfo.getRegionId(); + region.setName(regionInfo.getRegionName()); + region.version = regionInfo.getVersion(); + + // find region assignment to server + value = startRowResult.getValue(HConstants.CATALOG_FAMILY, + HConstants.SERVER_QUALIFIER); + if (value != null && value.length > 0) { + String hostAndPort = Bytes.toString(value); + region.setServerName(Bytes.toBytes( + Addressing.parseHostname(hostAndPort))); + region.port = Addressing.parsePort(hostAndPort); + } + return region; + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + } +} diff --git a/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java b/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java index 12247d0..5134ed1 100644 --- a/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java +++ b/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java @@ -40,7 +40,7 @@ import org.junit.Test; import org.junit.experimental.categories.Category; /** - * Unit testing for ThriftServer.HBaseHandler, a part of the + * Unit testing for ThriftServerRunner.HBaseHandler, a part of the * org.apache.hadoop.hbase.thrift package. */ @Category(MediumTests.class) @@ -100,8 +100,8 @@ public class TestThriftServer { */ @Test public void doTestTableCreateDrop() throws Exception { - ThriftServer.HBaseHandler handler = - new ThriftServer.HBaseHandler(UTIL.getConfiguration()); + ThriftServerRunner.HBaseHandler handler = + new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration()); createTestTables(handler); dropTestTables(handler); } @@ -140,8 +140,8 @@ public class TestThriftServer { */ public void doTestTableMutations() throws Exception { // Setup - ThriftServer.HBaseHandler handler = - new ThriftServer.HBaseHandler(UTIL.getConfiguration()); + ThriftServerRunner.HBaseHandler handler = + new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration()); handler.createTable(tableAname, getColumnDescriptors()); // Apply a few Mutations to rowA @@ -213,8 +213,8 @@ public class TestThriftServer { */ public void doTestTableTimestampsAndColumns() throws Exception { // Setup - ThriftServer.HBaseHandler handler = - new ThriftServer.HBaseHandler(UTIL.getConfiguration()); + ThriftServerRunner.HBaseHandler handler = + new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration()); handler.createTable(tableAname, getColumnDescriptors()); // Apply timestamped Mutations to rowA @@ -292,8 +292,8 @@ public class TestThriftServer { */ public void doTestTableScanners() throws Exception { // Setup - ThriftServer.HBaseHandler handler = - new ThriftServer.HBaseHandler(UTIL.getConfiguration()); + ThriftServerRunner.HBaseHandler handler = + new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration()); handler.createTable(tableAname, getColumnDescriptors()); // Apply timestamped Mutations to rowA @@ -360,8 +360,8 @@ public class TestThriftServer { * @throws Exception */ public void doTestGetTableRegions() throws Exception { - ThriftServer.HBaseHandler handler = - new ThriftServer.HBaseHandler(UTIL.getConfiguration()); + ThriftServerRunner.HBaseHandler handler = + new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration()); handler.createTable(tableAname, getColumnDescriptors()); int regionCount = handler.getTableRegions(tableAname).size(); assertEquals("empty table should have only 1 region, " + @@ -456,7 +456,7 @@ public class TestThriftServer { * @param handler the HBaseHandler interfacing to HBase * @throws Exception */ - private void closeScanner(int scannerId, ThriftServer.HBaseHandler handler) throws Exception { + private void closeScanner(int scannerId, ThriftServerRunner.HBaseHandler handler) throws Exception { handler.scannerGet(scannerId); handler.scannerClose(scannerId); } diff --git a/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServerCmdLine.java b/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServerCmdLine.java index 477141f..9607279 100644 --- a/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServerCmdLine.java +++ b/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServerCmdLine.java @@ -32,7 +32,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.LargeTests; import org.apache.hadoop.hbase.MediumTests; -import org.apache.hadoop.hbase.thrift.ThriftServer.ImplType; +import org.apache.hadoop.hbase.thrift.ThriftServerRunner.ImplType; import org.apache.hadoop.hbase.thrift.generated.Hbase; import org.apache.hadoop.hbase.util.Threads; import org.apache.thrift.protocol.TBinaryProtocol; @@ -82,7 +82,7 @@ public class TestThriftServerCmdLine { @Parameters public static Collection getParameters() { Collection parameters = new ArrayList(); - for (ThriftServer.ImplType implType : ThriftServer.ImplType.values()) { + for (ImplType implType : ImplType.values()) { for (boolean specifyFramed : new boolean[] {false, true}) { for (boolean specifyBindIP : new boolean[] {false, true}) { if (specifyBindIP && !implType.canSpecifyBindIP) { @@ -181,7 +181,8 @@ public class TestThriftServerCmdLine { } else { expectedClass = TBoundedThreadPoolServer.class; } - assertEquals(expectedClass, thriftServer.server.getClass()); + assertEquals(expectedClass, + thriftServer.serverRunner.tserver.getClass()); if (clientSideException != null) { LOG.error("Thrift client threw an exception", clientSideException);