Index: src/java/org/apache/hadoop/hbase/thrift/package.html =================================================================== --- src/java/org/apache/hadoop/hbase/thrift/package.html (revision 901470) +++ src/java/org/apache/hadoop/hbase/thrift/package.html (working copy) @@ -34,14 +34,52 @@

Description

-

The Hbase API is defined in the -file Hbase.thrift. A server-side implementation of the API is in -org.apache.hadoop.hbase.thrift.ThriftServer. The generated interfaces, -types, and RPC utility files are checked into SVN under the -org.apache.hadoop.hbase.thrift.generated directory. +

Important note: This Thrift interface is deprecated and scheduled for removal in HBase 0.22. +A new version that matches the client API that was introduced in HBase 0.21 can be found +in the contrib directory. +

+

The {@link org.apache.hadoop.hbase.thrift.generated.Hbase.Iface HBase API} is defined in the +file Hbase.thrift>. A server-side implementation of the API is in +{@link org.apache.hadoop.hbase.thrift.ThriftServer}. The generated interfaces, +types, and RPC utility files reside in the +{@link org.apache.hadoop.hbase.thrift.generated} package.

+

To start ThriftServer, use: +

+  ./bin/hbase-daemon.sh start thrift
+
+

+ +

To stop, use: +

+  ./bin/hbase-daemon.sh stop thrift
+
+

+ +These are the command line arguments the Thrift server understands in addition to start and stop: +
+
-b, --bind
+
Address to bind the Thrift server to [default: 0.0.0.0]
+ +
-p, --port
+
Port to bind to [default: 9090]
+ +
-f, --framed
+
Use framed transport (implied when using one of the non-blocking servers)
+ +
-c, --compact
+
Use the compact protocol [default: binary protocol]
+ +
-h, --help
+
Displays usage information for the Thrift server
+
+ +

Details

+ +

HBase currently uses version 0.2.0 of Apache Thrift.

+

The files were generated by running the commands:

   thrift -strict --gen java:hashcode Hbase.thrift
@@ -51,22 +89,9 @@
 

The 'thrift' binary is the Thrift compiler, and it is distributed as a part -of the Thrift package. Additionally, specific language runtime libraries are a +of the Thrift package. Additionally, specific language runtime libraries are a part of the Thrift package. A version of the Java runtime is checked into SVN -under the hbase/lib directory. -

+under the hbase/lib directory.

-

To start ThriftServer, use: -

-  ./bin/hbase-daemon.sh start thrift [--port=PORT]
-
-The default port is 9090. -

- -

To stop, use: -

-  ./bin/hbase-daemon.sh stop thrift
-
-

Index: src/java/org/apache/hadoop/hbase/thrift/ThriftServer.java =================================================================== --- src/java/org/apache/hadoop/hbase/thrift/ThriftServer.java (revision 901470) +++ src/java/org/apache/hadoop/hbase/thrift/ThriftServer.java (working copy) @@ -18,13 +18,11 @@ package org.apache.hadoop.hbase.thrift; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; - +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -60,18 +58,32 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TCompactProtocol; import org.apache.thrift.protocol.TProtocolFactory; import org.apache.thrift.server.TServer; import org.apache.thrift.server.TThreadPoolServer; +import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TServerSocket; import org.apache.thrift.transport.TServerTransport; +import org.apache.thrift.transport.TTransportFactory; +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + /** * ThriftServer - this class starts up a Thrift server which implements the * Hbase API specified in the Hbase.thrift IDL file. */ public class ThriftServer { - + /** * The HBaseHandler is a glue object that connects Thrift RPC calls to the * HBase client API primarily defined in the HBaseAdmin and HTable objects. @@ -84,10 +96,10 @@ // nextScannerId and scannerMap are used to manage scanner state protected int nextScannerId = 0; protected HashMap scannerMap = null; - + /** * Returns a list of all the column families for a given htable. - * + * * @param table * @return * @throws IOException @@ -96,15 +108,15 @@ HColumnDescriptor[] cds = table.getTableDescriptor().getColumnFamilies(); byte[][] columns = new byte[cds.length][]; for (int i = 0; i < cds.length; i++) { - columns[i] = Bytes.add(cds[i].getName(), + columns[i] = Bytes.add(cds[i].getName(), KeyValue.COLUMN_FAMILY_DELIM_ARRAY); } return columns; } - + /** * Creates and returns an HTable instance from a given table name. - * + * * @param tableName * name of table * @return HTable object @@ -115,11 +127,11 @@ IOException { return new HTable(this.conf, tableName); } - + /** * Assigns a unique ID to the scanner and adds the mapping to an internal * hash-map. - * + * * @param scanner * @return integer scanner id */ @@ -128,31 +140,31 @@ scannerMap.put(id, scanner); return id; } - + /** * Returns the scanner associated with the specified ID. - * + * * @param id * @return a Scanner, or null if ID was invalid. */ protected synchronized ResultScanner getScanner(int id) { return scannerMap.get(id); } - + /** * Removes the scanner associated with the specified ID from the internal * id->scanner hash-map. - * + * * @param id * @return a Scanner, or null if ID was invalid. */ protected synchronized ResultScanner removeScanner(int id) { return scannerMap.remove(id); } - + /** * Constructs an HBaseHandler object. - * + * * @throws MasterNotRunningException */ HBaseHandler() throws MasterNotRunningException { @@ -160,7 +172,7 @@ admin = new HBaseAdmin(conf); scannerMap = new HashMap(); } - + public void enableTable(final byte[] tableName) throws IOError { try{ admin.enableTable(tableName); @@ -168,7 +180,7 @@ throw new IOError(e.getMessage()); } } - + public void disableTable(final byte[] tableName) throws IOError{ try{ admin.disableTable(tableName); @@ -176,7 +188,7 @@ throw new IOError(e.getMessage()); } } - + public boolean isTableEnabled(final byte[] tableName) throws IOError { try { return HTable.isTableEnabled(tableName); @@ -184,7 +196,7 @@ throw new IOError(e.getMessage()); } } - + public void compact(byte[] tableNameOrRegionName) throws IOError { try{ admin.compact(tableNameOrRegionName); @@ -198,9 +210,9 @@ admin.majorCompact(tableNameOrRegionName); } catch (IOException e) { throw new IOError(e.getMessage()); - } + } } - + public List getTableNames() throws IOError { try { HTableDescriptor[] tables = this.admin.listTables(); @@ -213,7 +225,7 @@ throw new IOError(e.getMessage()); } } - + public List getTableRegions(byte[] tableName) throws IOError { try{ @@ -235,7 +247,7 @@ throw new IOError(e.getMessage()); } } - + @Deprecated public List get(byte[] tableName, byte[] row, byte[] column) throws IOError { @@ -262,7 +274,7 @@ throw new IOError(e.getMessage()); } } - + @Deprecated public List getVer(byte[] tableName, byte[] row, byte[] column, int numVersions) throws IOError { @@ -273,7 +285,7 @@ return getVer(tableName, row, famAndQf[0], famAndQf[1], numVersions); } - public List getVer(byte [] tableName, byte [] row, byte [] family, + public List getVer(byte [] tableName, byte [] row, byte [] family, byte [] qualifier, int numVersions) throws IOError { try { HTable table = getTable(tableName); @@ -286,7 +298,7 @@ throw new IOError(e.getMessage()); } } - + @Deprecated public List getVerTs(byte[] tableName, byte[] row, byte[] column, long timestamp, int numVersions) throws IOError { @@ -295,7 +307,7 @@ return getVerTs(tableName, row, famAndQf[0], new byte[0], timestamp, numVersions); } - return getVerTs(tableName, row, famAndQf[0], famAndQf[1], timestamp, + return getVerTs(tableName, row, famAndQf[0], famAndQf[1], timestamp, numVersions); } @@ -313,25 +325,25 @@ throw new IOError(e.getMessage()); } } - + public List getRow(byte[] tableName, byte[] row) throws IOError { return getRowWithColumnsTs(tableName, row, null, HConstants.LATEST_TIMESTAMP); } - + public List getRowWithColumns(byte[] tableName, byte[] row, List columns) throws IOError { return getRowWithColumnsTs(tableName, row, columns, HConstants.LATEST_TIMESTAMP); } - + public List getRowTs(byte[] tableName, byte[] row, long timestamp) throws IOError { return getRowWithColumnsTs(tableName, row, null, timestamp); } - + public List getRowWithColumnsTs(byte[] tableName, byte[] row, List columns, long timestamp) throws IOError { try { @@ -359,12 +371,12 @@ throw new IOError(e.getMessage()); } } - + public void deleteAll(byte[] tableName, byte[] row, byte[] column) throws IOError { deleteAllTs(tableName, row, column, HConstants.LATEST_TIMESTAMP); } - + public void deleteAllTs(byte[] tableName, byte[] row, byte[] column, long timestamp) throws IOError { try { @@ -377,16 +389,16 @@ delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp); } table.delete(delete); - + } catch (IOException e) { throw new IOError(e.getMessage()); } } - + public void deleteAllRow(byte[] tableName, byte[] row) throws IOError { deleteAllRowTs(tableName, row, HConstants.LATEST_TIMESTAMP); } - + public void deleteAllRowTs(byte[] tableName, byte[] row, long timestamp) throws IOError { try { @@ -397,7 +409,7 @@ throw new IOError(e.getMessage()); } } - + public void createTable(byte[] tableName, List columnFamilies) throws IOError, IllegalArgument, AlreadyExists { @@ -417,7 +429,7 @@ throw new IllegalArgument(e.getMessage()); } } - + public void deleteTable(byte[] tableName) throws IOError { if (LOG.isDebugEnabled()) { LOG.debug("deleteTable: table=" + new String(tableName)); @@ -431,13 +443,13 @@ throw new IOError(e.getMessage()); } } - + public void mutateRow(byte[] tableName, byte[] row, List mutations) throws IOError, IllegalArgument { mutateRowTs(tableName, row, mutations, HConstants.LATEST_TIMESTAMP); } - - public void mutateRowTs(byte[] tableName, byte[] row, + + public void mutateRowTs(byte[] tableName, byte[] row, List mutations, long timestamp) throws IOError, IllegalArgument { HTable table = null; try { @@ -473,8 +485,8 @@ throw new IllegalArgument(e.getMessage()); } } - - public void mutateRows(byte[] tableName, List rowBatches) + + public void mutateRows(byte[] tableName, List rowBatches) throws IOError, IllegalArgument, TException { mutateRowsTs(tableName, rowBatches, HConstants.LATEST_TIMESTAMP); } @@ -528,18 +540,18 @@ } @Deprecated - public long atomicIncrement(byte[] tableName, byte[] row, byte[] column, + public long atomicIncrement(byte[] tableName, byte[] row, byte[] column, long amount) throws IOError, IllegalArgument, TException { byte [][] famAndQf = KeyValue.parseColumn(column); if(famAndQf.length == 1) { - return atomicIncrement(tableName, row, famAndQf[0], new byte[0], + return atomicIncrement(tableName, row, famAndQf[0], new byte[0], amount); } return atomicIncrement(tableName, row, famAndQf[0], famAndQf[1], amount); } public long atomicIncrement(byte [] tableName, byte [] row, byte [] family, - byte [] qualifier, long amount) + byte [] qualifier, long amount) throws IOError, IllegalArgument, TException { HTable table; try { @@ -549,7 +561,7 @@ throw new IOError(e.getMessage()); } } - + public void scannerClose(int id) throws IOError, IllegalArgument { LOG.debug("scannerClose: id=" + id); ResultScanner scanner = getScanner(id); @@ -559,7 +571,7 @@ scanner.close(); removeScanner(id); } - + public List scannerGetList(int id,int nbRows) throws IllegalArgument, IOError { LOG.debug("scannerGetList: id=" + id); ResultScanner scanner = getScanner(id); @@ -601,7 +613,7 @@ throw new IOError(e.getMessage()); } } - + public int scannerOpenWithStop(byte[] tableName, byte[] startRow, byte[] stopRow, List columns) throws IOError, TException { try { @@ -668,7 +680,7 @@ throw new IOError(e.getMessage()); } } - + public int scannerOpenWithStopTs(byte[] tableName, byte[] startRow, byte[] stopRow, List columns, long timestamp) throws IOError, TException { @@ -692,16 +704,16 @@ throw new IOError(e.getMessage()); } } - + public Map getColumnDescriptors( byte[] tableName) throws IOError, TException { try { TreeMap columns = new TreeMap(Bytes.BYTES_COMPARATOR); - + HTable table = getTable(tableName); HTableDescriptor desc = table.getTableDescriptor(); - + for (HColumnDescriptor e : desc.getFamilies()) { ColumnDescriptor col = ThriftUtilities.colDescFromHbase(e); columns.put(col.name, col); @@ -710,85 +722,108 @@ } catch (IOException e) { throw new IOError(e.getMessage()); } - } + } } - + + // // Main program and support routines // - - private static void printUsageAndExit() { - printUsageAndExit(null); + + private static void printUsageAndExit(Options options, int exitCode) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("Thrift", null, options, + "To start the Thrift server run 'bin/hbase-daemon.sh start thrift'\n" + + "To shutdown the thrift server run 'bin/hbase-daemon.sh stop thrift' or" + + " send a kill signal to the thrift server pid", + true); + System.exit(exitCode); } - - private static void printUsageAndExit(final String message) { - if (message != null) { - System.err.println(message); - } - System.out.println("Usage: java org.apache.hadoop.hbase.thrift.ThriftServer " + - "--help | [--port=PORT] start"); - System.out.println("Arguments:"); - System.out.println(" start Start thrift server"); - System.out.println(" stop Stop thrift server"); - System.out.println("Options:"); - System.out.println(" port Port to listen on. Default: 9090"); - // System.out.println(" bind Address to bind on. Default: 0.0.0.0."); - System.out.println(" help Print this message and exit"); - System.exit(0); - } + private static final String DEFAULT_LISTEN_PORT = "9090"; + /* * Start up the Thrift server. * @param args */ protected static void doMain(final String [] args) throws Exception { - if (args.length < 1) { - printUsageAndExit(); + Log LOG = LogFactory.getLog("ThriftServer"); + + Options options = new Options(); + options.addOption("b", "bind", true, "Address to bind the Thrift server to [default: 0.0.0.0]"); + options.addOption("p", "port", true, "Port to bind to [default: 9090]"); + options.addOption("f", "framed", false, "Use framed transport"); + options.addOption("c", "compact", false, "Use the compact protocol"); + options.addOption("h", "help", false, "Print help information"); + + CommandLineParser parser = new PosixParser(); + CommandLine cmd = parser.parse(options, args); + + /** + * This is so complicated to please both bin/hbase and bin/hbase-daemon.sh + * hbase-daemon provides "start" and "stop" arguments + * hbase should print the help if no argument is provided + */ + List commandLine = Arrays.asList(args); + boolean stop = commandLine.contains("stop"); + boolean start = commandLine.contains("start"); + if (cmd.hasOption("help") || !start || stop) { + printUsageAndExit(options, 1); } - int port = 9090; - // String bindAddress = "0.0.0.0"; - - // Process command-line args. TODO: Better cmd-line processing - // (but hopefully something not as painful as cli options). -// final String addressArgKey = "--bind="; - final String portArgKey = "--port="; - for (String cmd: args) { -// if (cmd.startsWith(addressArgKey)) { -// bindAddress = cmd.substring(addressArgKey.length()); -// continue; -// } else - if (cmd.startsWith(portArgKey)) { - port = Integer.parseInt(cmd.substring(portArgKey.length())); - continue; - } else if (cmd.equals("--help") || cmd.equals("-h")) { - printUsageAndExit(); - } else if (cmd.equals("start")) { - continue; - } else if (cmd.equals("stop")) { - printUsageAndExit("To shutdown the thrift server run " + - "bin/hbase-daemon.sh stop thrift or send a kill signal to " + - "the thrift server pid"); + // Get IP address to bind to + InetAddress listenAddress = null; + if (cmd.hasOption("bind")) { + try { + listenAddress = InetAddress.getByName(cmd.getOptionValue("bind")); + } catch (UnknownHostException e) { + LOG.error("Could not bind to provided ip address", e); + printUsageAndExit(options, -1); } - - // Print out usage if we get to here. - printUsageAndExit(); + } else { + listenAddress = InetAddress.getLocalHost(); } - Log LOG = LogFactory.getLog("ThriftServer"); - LOG.info("starting HBase Thrift server on port " + - Integer.toString(port)); + + // Get port to bind to + int listenPort = 0; + try { + listenPort = Integer.parseInt(cmd.getOptionValue("port", DEFAULT_LISTEN_PORT)); + } catch (NumberFormatException e) { + LOG.error("Could not parse the value provided for the port option", e); + printUsageAndExit(options, -1); + } + TServerTransport serverTransport = new TServerSocket(new InetSocketAddress(listenAddress, listenPort)); + + LOG.info("starting HBase Thrift server on " + listenAddress + ":" + Integer.toString(listenPort)); + + // Construct correct TransportFactory + TTransportFactory transportFactory; + if (cmd.hasOption("framed")) { + transportFactory = new TFramedTransport.Factory(); + LOG.debug("Using framed transport"); + } else { + transportFactory = new TTransportFactory(); + } + + // Construct correct ProtocolFactory + TProtocolFactory protocolFactory; + if (cmd.hasOption("compact")) { + LOG.debug("Using compact protocol"); + protocolFactory = new TCompactProtocol.Factory(); + } else { + LOG.debug("Using binary protocol"); + protocolFactory = new TBinaryProtocol.Factory(); + } + HBaseHandler handler = new HBaseHandler(); Hbase.Processor processor = new Hbase.Processor(handler); - TServerTransport serverTransport = new TServerSocket(port); - TProtocolFactory protFactory = new TBinaryProtocol.Factory(true, true); - TServer server = new TThreadPoolServer(processor, serverTransport, - protFactory); + TServer server = new TThreadPoolServer(processor, serverTransport, transportFactory, protocolFactory); server.serve(); } - + /** * @param args - * @throws Exception + * @throws Exception */ public static void main(String [] args) throws Exception { doMain(args);