diff --git a/src/main/java/org/apache/hadoop/hbase/thrift2/HBaseAdminPool.java b/src/main/java/org/apache/hadoop/hbase/thrift2/HBaseAdminPool.java new file mode 100644 index 0000000..9bae967 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/thrift2/HBaseAdminPool.java @@ -0,0 +1,64 @@ +package org.apache.hadoop.hbase.thrift2; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.client.HBaseAdmin; + +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +public class HBaseAdminPool { + private final Queue admins = new ConcurrentLinkedQueue(); + private final Configuration config; + private final int maxSize; + + /** + * Default Constructor. Default HBaseConfiguration and no limit on pool size. + */ + public HBaseAdminPool() { + this(HBaseConfiguration.create(), Integer.MAX_VALUE); + } + + /** + * Constructor to set maximum versions and use the specified configuration. + * @param config configuration + * @param maxSize maximum number of references to keep for each table + */ + public HBaseAdminPool(Configuration config, int maxSize) { + this.config = config; + this.maxSize = maxSize; + } + + /** + * Get a reference to a HBaseAdmin. + * Create a new one if one is not available. + * + * @return a HBaseAdmin + * @throws RuntimeException if there is a problem instantiating the HTable + */ + public HBaseAdmin getAdmin() throws MasterNotRunningException { + HBaseAdmin admin = admins.poll(); + if (admin == null) { + return createHBaseAdmin(); + } + return admin; + } + + /** + * Puts the specified HBaseAdmin back into the pool.

+ * + * If the pool already contains maxSize references to the table, + * then nothing happens. + * @param admin + */ + public void putAdmin(HBaseAdmin admin) { + if (admins.size() >= maxSize) return; + admins.add(admin); + } + + protected HBaseAdmin createHBaseAdmin() throws MasterNotRunningException { + return new HBaseAdmin(config); + } + +} diff --git a/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHbaseClientHandler.java b/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHbaseClientHandler.java new file mode 100644 index 0000000..f2b4c89 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHbaseClientHandler.java @@ -0,0 +1,264 @@ +package org.apache.hadoop.hbase.thrift2; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.HTablePool; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.thrift2.generated.*; +import org.apache.thrift.TException; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.*; + +/** + * This class is a glue object that connects Thrift RPC calls to the HBase client API primarily + * defined in the HTableInterface. + */ +public class ThriftHBaseClientHandler implements HbaseClient.Iface { + + // TODO: Size of pool configuraple + private final HTablePool htablePool = new HTablePool(); + private static final Log LOG = LogFactory.getLog(ThriftHBaseClientHandler.class); + + // nextScannerId and scannerMap are used to manage scanner state + // TODO: Cleanup thread for Scanners, Scanner id wrap + private final AtomicInteger nextScannerId = new AtomicInteger(0); + private final Map scannerMap = new ConcurrentHashMap(); + + private HTableInterface getTable(byte[] tableName) { + return htablePool.getTable(tableName); + } + + private void putTable(HTableInterface table) { + htablePool.putTable(table); + } + + /** + * Assigns a unique ID to the scanner and adds the mapping to an internal HashMap. + * + * @param scanner to add + * + * @return Id for this Scanner + */ + private int addScanner(ResultScanner scanner) { + int id = nextScannerId.getAndIncrement(); + scannerMap.put(id, scanner); + return id; + } + + /** + * Returns the Scanner associated with the specified Id. + * + * @param id of the Scanner to get + * + * @return a Scanner, or null if the Id is invalid + */ + private ResultScanner getScanner(int id) { + return scannerMap.get(id); + } + + /** + * Removes the scanner associated with the specified ID from the internal HashMap. + * + * @param id of the Scanner to remove + * + * @return the removed Scanner, or null if the Id is invalid + */ + protected ResultScanner removeScanner(int id) { + return scannerMap.remove(id); + } + + @Override + public boolean exists(ByteBuffer table, TGet get) throws TIOError, TException { + HTableInterface htable = getTable(table.array()); + try { + return htable.exists(getFromThrift(get)); + } catch (IOException e) { + throw new TIOError(e.getMessage()); + } finally { + putTable(htable); + } + } + + @Override + public TResult get(ByteBuffer table, TGet get) throws TIOError, TException { + HTableInterface htable = getTable(table.array()); + try { + return resultFromHBase(htable.get(getFromThrift(get))); + } catch (IOException e) { + throw new TIOError(e.getMessage()); + } finally { + putTable(htable); + } + } + + @Override + public List getMultiple(ByteBuffer table, List gets) throws TIOError, TException { + HTableInterface htable = getTable(table.array()); + try { + return resultsFromHBase(htable.get(getsFromThrift(gets))); + } catch (IOException e) { + throw new TIOError(e.getMessage()); + } finally { + putTable(htable); + } + } + + @Override + public TResult getRowOrBefore(ByteBuffer table, ByteBuffer row, ByteBuffer family) throws TIOError, TException { + HTableInterface htable = getTable(table.array()); + try { + return resultFromHBase(htable.getRowOrBefore(row.array(), family.array())); + } catch (IOException e) { + throw new TIOError(e.getMessage()); + } finally { + putTable(htable); + } + } + + @Override + public void put(ByteBuffer table, TPut put) throws TIOError, TException { + HTableInterface htable = getTable(table.array()); + try { + htable.put(putFromThrift(put)); + } catch (IOException e) { + throw new TIOError(e.getMessage()); + } finally { + putTable(htable); + } + } + + @Override + public boolean checkAndPut(ByteBuffer table, ByteBuffer row, ByteBuffer family, ByteBuffer qualifier, + ByteBuffer value, TPut put) throws TIOError, TException { + HTableInterface htable = getTable(table.array()); + + try { + // TODO: Is the value really null? + if (value == null) { + return htable.checkAndPut(row.array(), family.array(), qualifier.array(), null, putFromThrift(put)); + } else { + return htable.checkAndPut(row.array(), family.array(), qualifier.array(), value.array(), putFromThrift(put)); + } + } catch (IOException e) { + throw new TIOError(e.getMessage()); + } finally { + putTable(htable); + } + } + + @Override + public void putMultiple(ByteBuffer table, List puts) throws TIOError, TException { + HTableInterface htable = getTable(table.array()); + try { + htable.put(putsFromThrift(puts)); + } catch (IOException e) { + throw new TIOError(e.getMessage()); + } finally { + putTable(htable); + } + } + + @Override + public void deleteSingle(ByteBuffer table, TDelete deleteSingle) throws TIOError, TException { + HTableInterface htable = getTable(table.array()); + try { + htable.delete(deleteFromThrift(deleteSingle)); + } catch (IOException e) { + throw new TIOError(e.getMessage()); + } finally { + putTable(htable); + } + } + + @Override + public List deleteMultiple(ByteBuffer table, List deletes) throws TIOError, TException { + HTableInterface htable = getTable(table.array()); + List tempDeletes = deletesFromThrift(deletes); + try { + htable.delete(tempDeletes); + } catch (IOException e) { + throw new TIOError(e.getMessage()); + } finally { + putTable(htable); + } + return deletesFromHBase(tempDeletes); + } + + @Override + public boolean checkAndDelete(ByteBuffer table, ByteBuffer row, ByteBuffer family, ByteBuffer qualifier, + ByteBuffer value, TDelete deleteSingle) throws TIOError, TException { + HTableInterface htable = getTable(table.array()); + + try { + if (value == null) { + return htable.checkAndDelete(row.array(), family.array(), qualifier.array(), null, + deleteFromThrift(deleteSingle)); + } else { + return htable.checkAndDelete(row.array(), family.array(), qualifier.array(), value.array(), + deleteFromThrift(deleteSingle)); + } + } catch (IOException e) { + throw new TIOError(e.getMessage()); + } finally { + putTable(htable); + } + } + + @Override + public long incrementColumnValue(ByteBuffer table, ByteBuffer row, ByteBuffer family, ByteBuffer qualifier, + long amount, boolean writeToWal) throws TIOError, TException { + HTableInterface htable = getTable(table.array()); + try { + return htable.incrementColumnValue(row.array(), family.array(), qualifier.array(), amount, writeToWal); + } catch (IOException e) { + throw new TIOError(e.getMessage()); + } finally { + putTable(htable); + } + } + + @Override + public int openScanner(ByteBuffer table, TScan scan) throws TIOError, TException { + HTableInterface htable = getTable(table.array()); + ResultScanner resultScanner = null; + try { + resultScanner = htable.getScanner(scanFromThrift(scan)); + } catch (IOException e) { + throw new TIOError(e.getMessage()); + } finally { + putTable(htable); + } + return addScanner(resultScanner); + } + + @Override + public List getScannerRows(int scannerId, int numRows) throws TIOError, TIllegalArgument, TException { + ResultScanner scanner = getScanner(scannerId); + if (scanner == null) { + throw new TIllegalArgument("Invalid scanner Id"); + } + + try { + return resultsFromHBase(scanner.next(numRows)); + } catch (IOException e) { + throw new TIOError(e.getMessage()); + } + } + + @Override + public void closeScanner(int scannerId) throws TIOError, TIllegalArgument, TException { + if (removeScanner(scannerId) == null) { + throw new TIllegalArgument("Invalid scanner Id"); + } + } + +} diff --git a/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java b/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java new file mode 100644 index 0000000..1ac0803 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.thrift2; + +import org.apache.commons.cli.*; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.thrift2.generated.HbaseClient; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TCompactProtocol; +import org.apache.thrift.protocol.TProtocolFactory; +import org.apache.thrift.server.THsHaServer; +import org.apache.thrift.server.TNonblockingServer; +import org.apache.thrift.server.TServer; +import org.apache.thrift.server.TThreadPoolServer; +import org.apache.thrift.transport.*; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.List; + +/** + * ThriftServer - this class starts up a Thrift server which implements the HBase API specified in the + * HbaseClient.thrift IDL file. + */ +public class ThriftServer { + + public ThriftServer() { + throw new UnsupportedOperationException("Can't initialize class"); + } + + private static void printUsageAndExit(Options options, int exitCode) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("Thrift", null, options, + "To start the Thrift server run 'bin/hbase-daemon.sh start thrift'\n" + + "To shutdown the thrift server run 'bin/hbase-daemon.sh stop thrift' or" + + " send a kill signal to the thrift server pid", + true); + System.exit(exitCode); + } + + private static final String DEFAULT_LISTEN_PORT = "9090"; + + /* + * Start up the Thrift server. + * @param args + */ + private static void doMain(String[] args) throws Exception { + Log LOG = LogFactory.getLog("ThriftServer"); + + Options options = new Options(); + options.addOption("b", "bind", true, + "Address to bind the Thrift server to. Not supported by the Nonblocking and HsHa server [default: 0.0.0.0]"); + options.addOption("p", "port", true, "Port to bind to [default: 9090]"); + options.addOption("f", "framed", false, "Use framed transport"); + options.addOption("c", "compact", false, "Use the compact protocol"); + options.addOption("h", "help", false, "Print help information"); + + OptionGroup servers = new OptionGroup(); + servers.addOption( + new Option("nonblocking", false, "Use the TNonblockingServer. This implies the framed transport.")); + servers.addOption(new Option("hsha", false, "Use the THsHaServer. This implies the framed transport.")); + servers.addOption(new Option("threadpool", false, "Use the TThreadPoolServer. This is the default.")); + options.addOptionGroup(servers); + + CommandLineParser parser = new PosixParser(); + CommandLine cmd = parser.parse(options, args); + + /** + * This is so complicated to please both bin/hbase and bin/hbase-daemon. + * hbase-daemon provides "start" and "stop" arguments + * hbase should print the help if no argument is provided + */ + List commandLine = Arrays.asList(args); + boolean stop = commandLine.contains("stop"); + boolean start = commandLine.contains("start"); + if (cmd.hasOption("help") || !start || stop) { + printUsageAndExit(options, 1); + } + + // Get port to bind to + int listenPort = 0; + try { + listenPort = Integer.parseInt(cmd.getOptionValue("port", DEFAULT_LISTEN_PORT)); + } catch (NumberFormatException e) { + LOG.error("Could not parse the value provided for the port option", e); + printUsageAndExit(options, -1); + } + + // Construct correct ProtocolFactory + TProtocolFactory protocolFactory; + if (cmd.hasOption("compact")) { + LOG.debug("Using compact protocol"); + protocolFactory = new TCompactProtocol.Factory(); + } else { + LOG.debug("Using binary protocol"); + protocolFactory = new TBinaryProtocol.Factory(); + } + + HbaseClient.Iface handler = new ThriftHBaseClientHandler(); + HbaseClient.Processor processor = new HbaseClient.Processor(handler); + + TServer server; + if (cmd.hasOption("nonblocking") || cmd.hasOption("hsha")) { + // TODO: Remove once HBASE-2155 is resolved + if (cmd.hasOption("bind")) { + LOG.error("The Nonblocking and HsHa servers don't support IP address binding at the moment." + + " See https://issues.apache.org/jira/browse/HBASE-2155 for details."); + printUsageAndExit(options, -1); + } + + TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(listenPort); + TFramedTransport.Factory transportFactory = new TFramedTransport.Factory(); + + if (cmd.hasOption("nonblocking")) { + LOG.info("starting HBase Nonblocking Thrift server on " + Integer.toString(listenPort)); + server = new TNonblockingServer(processor, serverTransport, transportFactory, protocolFactory); + } else { + LOG.info("starting HBase HsHA Thrift server on " + Integer.toString(listenPort)); + server = new THsHaServer(processor, serverTransport, transportFactory, protocolFactory); + } + } else { + // Get IP address to bind to + InetAddress listenAddress = null; + if (cmd.hasOption("bind")) { + try { + listenAddress = InetAddress.getByName(cmd.getOptionValue("bind")); + } catch (UnknownHostException e) { + LOG.error("Could not bind to provided ip address", e); + printUsageAndExit(options, -1); + } + } else { + listenAddress = InetAddress.getLocalHost(); + } + TServerTransport serverTransport = new TServerSocket(new InetSocketAddress(listenAddress, listenPort)); + + // Construct correct TransportFactory + TTransportFactory transportFactory; + if (cmd.hasOption("framed")) { + transportFactory = new TFramedTransport.Factory(); + LOG.debug("Using framed transport"); + } else { + transportFactory = new TTransportFactory(); + } + + LOG.info("starting HBase ThreadPool Thrift server on " + listenAddress + ":" + Integer.toString(listenPort)); + server = new TThreadPoolServer(processor, serverTransport, transportFactory, protocolFactory); + } + + server.serve(); + } + + public static void main(String[] args) throws Exception { + doMain(args); + } +} diff --git a/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftUtilities.java b/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftUtilities.java new file mode 100644 index 0000000..f48af6e --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftUtilities.java @@ -0,0 +1,371 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.thrift2; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.thrift2.generated.*; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.*; + +public class ThriftUtilities { + + private ThriftUtilities() { + throw new UnsupportedOperationException("Can't initialize class"); + } + +/* + */ +/** + * This utility method creates a new Thrift TableDescriptor "struct" based on + * a HBase HTableDescriptor object. + * + * @param in HBase HTableDescriptor object + * @return Thrift TableDescriptor + *//* + + public static TTableDescriptor tableDescFromHbase(HTableDescriptor in) { + TTableDescriptor out = new TTableDescriptor(in.getName(), in.isDeferredLogFlush(), + in.getMaxFileSize(), in.getMemStoreFlushSize(), null); + + for (HColumnDescriptor columnDescriptor : in.getFamilies()) { + out.addToFamilies(colDescFromHbase(columnDescriptor)); + } + return out; + } + + public static HTableDescriptor tableDescFromThrift(TTableDescriptor in) { + HTableDescriptor out = new HTableDescriptor(in.getName()); + out.setDeferredLogFlush(in.isDeferredLogFlush()); + out.setMaxFileSize(in.getMaxFileSize()); + out.setMemStoreFlushSize(in.getMemStoreFlushSize()); + out.setReadOnly(in.isReadOnly()); + + for (TColumnFamilyDescriptor colDesc : in.getFamilies()) { + out.addFamily(colDescFromThrift(colDesc)); + } + + return out; + } + + */ +/** + * This utility method creates a new Thrift ColumnDescriptor "struct" based on + * an Hbase HColumnDescriptor object. + * + * @param in HBase HColumnDescriptor object + * @return Thrift ColumnDescriptor + *//* + + public static TColumnFamilyDescriptor colDescFromHbase(HColumnDescriptor in) { + return new TColumnFamilyDescriptor(in.getName(), + TCompressionAlgorithm.valueOf(in.getCompression().toString()), in.getMaxVersions(), + in.getBlocksize(), in.isInMemory(), in.getTimeToLive(), in.isBlockCacheEnabled(), + in.isBloomfilter()); + } + + */ +/** + * This utility method creates a new Hbase HColumnDescriptor object based on a + * Thrift ColumnDescriptor "struct". + * + * @param in Thrift ColumnDescriptor object + * @return HColumnDescriptor + * @throws IllegalArgument + *//* + + public static HColumnDescriptor colDescFromThrift(TColumnFamilyDescriptor in) { + HColumnDescriptor out = new HColumnDescriptor(in.getName()); + out.setMaxVersions(in.getMaxVersions()); + out.setCompressionType(Compression.Algorithm.valueOf(in.getCompression().toString())); + out.setBlockCacheEnabled(in.isBlockCacheEnabled()); + out.setBlocksize(in.getBlocksize()); + out.setBloomfilter(in.isBloomfilterEnabled()); + out.setInMemory(in.isInMemory()); + int ttl = (in.getTimeToLive() != -1) ? in.getTimeToLive() : HConstants.FOREVER; + out.setTimeToLive(ttl); + return out; + } +*/ + + /** + * Creates a {@link Get} (HBase) from a {@link TGet} (Thrift). + * + * This ignores any timestamps set on {@link TColumn} objects. + * + * @param in the TGet to convert + * + * @return Get object + * + * @throws IOException if an invalid time range or max version parameter is given + */ + public static Get getFromThrift(TGet in) throws IOException { + Get out = new Get(in.getRow()); + + // Timestamp overwrites time range if both are set + if (in.isSetTimestamp()) { + out.setTimeStamp(in.getTimestamp()); + } else if (in.isSetTimeRange()) { + out.setTimeRange(in.getTimeRange().getMinStamp(), in.getTimeRange().getMaxStamp()); + } + + if (in.isSetMaxVersions()) { + out.setMaxVersions(in.getMaxVersions()); + } + + if (!in.isSetColumns()) { + return out; + } + + for (TColumn column : in.getColumns()) { + if (column.isSetQualifier()) { + out.addColumn(column.getFamily(), column.getQualifier()); + } else { + out.addFamily(column.getFamily()); + } + } + + return out; + } + + /** + * Converts multiple {@link TGet}s (Thrift) into a list of {@link Get}s (HBase). + * + * @param in list of TGets to convert + * + * @return list of Get objects + * + * @throws IOException if an invalid time range or max version parameter is given + * @see #getFromThrift(TGet) + */ + public static List getsFromThrift(List in) throws IOException { + List out = new ArrayList(in.size()); + for (TGet get : in) { + out.add(getFromThrift(get)); + } + return out; + } + + /** + * Creates a {@link TResult} (Thrift) from a {@link Result} (HBase). + * + * @param in the Result to convert + * + * @return converted result, returns an empty result if the input is null + */ + public static TResult resultFromHBase(Result in) { + TResult out = new TResult(); + + out.setRow(in.getRow()); + List values = new ArrayList(); + + // Map>> + for (Map.Entry>> family : in.getMap().entrySet()) { + for (Map.Entry> qualifier : family.getValue().entrySet()) { + for (Map.Entry timestamp : qualifier.getValue().entrySet()) { + TColumnValue col = new TColumnValue(); + col.setFamily(family.getKey()); + col.setQualifier(qualifier.getKey()); + col.setTimestamp(timestamp.getKey()); + values.add(col); + } + } + } + + out.setEntries(values); + return out; + } + + /** + * Converts multiple {@link Result}s (HBase) into a list of {@link TResult}s (Thrift). + * + * @param in array of Results to convert + * + * @return list of converted TResults + * + * @see #resultFromHBase(Result) + */ + public static List resultsFromHBase(Result[] in) { + List out = new ArrayList(in.length); + for (Result result : in) { + out.add(resultFromHBase(result)); + } + return out; + } + + /** + * Creates a {@link Put} (HBase) from a {@link TPut} (Thrift) + * + * @param in the TPut to convert + * + * @return converted Put + */ + public static Put putFromThrift(TPut in) { + Put out; + + if (in.isSetTimestamp()) { + out = new Put(in.getRow(), in.getTimestamp(), null); + } else { + out = new Put(in.getRow()); + } + + out.setWriteToWAL(in.isWriteToWal()); + + for (TColumnValue columnValue : in.getColumnValues()) { + if (columnValue.isSetTimestamp()) { + out.add(columnValue.getFamily(), columnValue.getQualifier(), columnValue.getTimestamp(), + columnValue.getValue()); + } else { + out.add(columnValue.getFamily(), columnValue.getQualifier(), columnValue.getValue()); + } + } + + return out; + } + + /** + * Converts multiple {@link TPut}s (Thrift) into a list of {@link Put}s (HBase). + * + * @param in list of TPuts to convert + * + * @return list of converted Puts + * + * @see #putFromThrift(TPut) + */ + public static List putsFromThrift(List in) { + List out = new ArrayList(in.size()); + for (TPut put : in) { + out.add(putFromThrift(put)); + } + return out; + } + + /** + * Creates a {@link Delete} (HBase) from a {@link TDelete} (Thrift). + * + * @param in the TDelete to convert + * + * @return converted Delete + */ + public static Delete deleteFromThrift(TDelete in) { + Delete out; + + if (in.isSetColumns()) { + out = new Delete(in.getRow()); + for (TColumn column : in.getColumns()) { + if (column.isSetQualifier()) { + if (column.isSetTimestamp()) { + out.deleteColumn(column.getFamily(), column.getQualifier(), column.getTimestamp()); + } else { + out.deleteColumn(column.getFamily(), column.getQualifier()); + } + + } else { + if (column.isSetTimestamp()) { + out.deleteFamily(column.getFamily(), column.getTimestamp()); + } else { + out.deleteFamily(column.getFamily()); + } + } + } + } else { + if (in.isSetTimestamp()) { + out = new Delete(in.getRow(), in.getTimestamp(), null); + } else { + out = new Delete(in.getRow()); + } + } + + return out; + } + + /** + * Converts multiple {@link TDelete}s (Thrift) into a list of {@link Delete}s (HBase). + * + * @param in list of TDeletes to convert + * + * @return list of converted Deletes + * + * @see #deleteFromThrift(TDelete) + */ + + public static List deletesFromThrift(List in) { + List out = new ArrayList(in.size()); + for (TDelete delete : in) { + out.add(deleteFromThrift(delete)); + } + return out; + } + + // TODO: Not yet entirely sure what the best way to do this is + public static TDelete deleteFromHBase(Delete in) { + TDelete out = new TDelete(ByteBuffer.wrap(in.getRow())); + + List columns = new ArrayList(); + + // Map> + for (Map.Entry> listEntry : in.getFamilyMap().entrySet()) { + TColumn column = new TColumn(ByteBuffer.wrap(listEntry.getKey())); + for (KeyValue keyValue : listEntry.getValue()) { + if (keyValue.isDeleteFamily() && keyValue.getTimestamp() != HConstants.LATEST_TIMESTAMP) { + column.setTimestamp(keyValue.getTimestamp()); + } + } + } + + return out; + } + + public static List deletesFromHBase(List in) { + List out = new ArrayList(in.size()); + for (Delete delete : in) { + if (delete == null) { + out.add(null); + } else { + out.add(deleteFromHBase(delete)); + } + } + return out; + } + + public static Scan scanFromThrift(TScan in) { + Scan out = new Scan(); + + if (in.isSetStartRow()) out.setStartRow(in.getStartRow()); + if (in.isSetStopRow()) out.setStopRow(in.getStopRow()); + if (in.isSetCaching()) out.setCaching(in.getCaching()); + + // TODO: Timestamps + if (in.isSetColumns()) { + for (TColumn column : in.getColumns()) { + if (column.isSetQualifier()) { + out.addColumn(column.getFamily(), column.getQualifier()); + } else { + out.addFamily(column.getFamily()); + } + } + } + + return out; + } + +} diff --git a/src/main/java/org/apache/hadoop/hbase/thrift2/package.html b/src/main/java/org/apache/hadoop/hbase/thrift2/package.html new file mode 100644 index 0000000..37434c3 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/thrift2/package.html @@ -0,0 +1,78 @@ + + + + + + + +Provides an HBase Thrift +service. + +This directory contains a Thrift interface definition file for an Hbase RPC +service and a Java server implementation. + +

What is Thrift?

+ +

"Thrift is a software framework for scalable cross-language services +development. It combines a powerful software stack with a code generation +engine to build services that work efficiently and seamlessly between C++, +Java, Python, PHP, and Ruby. Thrift was developed at Facebook, and we are now +releasing it as open source." For additional information, see +http://developers.facebook.com/thrift/. Facebook has announced their intent +to migrate Thrift into Apache Incubator. +

+ +

Description

+ +

The Hbase API is defined in the +file Hbase.thrift. A server-side implementation of the API is in +org.apache.hadoop.hbase.thrift2.ThriftServer. The generated interfaces, +types, and RPC utility files are checked into SVN under the +org.apache.hadoop.hbase.thrift2.generated directory. + +

+ +

The files were generated by running the commands: +

+  thrift -strict --gen java Hbase.thrift
+  mv gen-java/org/apache/hadoop/hbase/thrift/generated .
+  rm -rf gen-java
+
+

+ +

The 'thrift' binary is the Thrift compiler, and it is distributed as a part +of +the Thrift package. Additionally, specific language runtime libraries are a +part of the Thrift package. A version of the Java runtime is checked into SVN +under the hbase/lib directory. +

+ +

To start ThriftServer, use: +

+  ./bin/hbase-daemon.sh start thrift [--port=PORT]
+
+The default port is 9090. +

+ +

To stop, use: +

+  ./bin/hbase-daemon.sh stop thrift
+
+

+ + diff --git a/src/main/resources/org/apache/hadoop/hbase/thrift2/HbaseAdmin.thrift b/src/main/resources/org/apache/hadoop/hbase/thrift2/HbaseAdmin.thrift new file mode 100644 index 0000000..d48ae94 --- /dev/null +++ b/src/main/resources/org/apache/hadoop/hbase/thrift2/HbaseAdmin.thrift @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace java org.apache.hadoop.hbase.thrift2.generated +namespace cpp apache.hadoop.hbase.thrift +namespace rb Apache.Hadoop.Hbase.Thrift +namespace py hbase +namespace perl Hbase + +include "Types.thrift" +include "HbaseClient.thrift" + +service HbaseAdminClient extends HbaseClient { + // isTableEnabled + // getRegionLocation + // getTableDescriptor + // getStartKeys, getEndKeys + +} \ No newline at end of file diff --git a/src/main/resources/org/apache/hadoop/hbase/thrift2/HbaseClient.thrift b/src/main/resources/org/apache/hadoop/hbase/thrift2/HbaseClient.thrift new file mode 100644 index 0000000..de10a2f --- /dev/null +++ b/src/main/resources/org/apache/hadoop/hbase/thrift2/HbaseClient.thrift @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// NOTE: The "required" and "optional" keywords for the service methods are purely for documentation + +namespace java org.apache.hadoop.hbase.thrift2.generated +namespace cpp apache.hadoop.hbase.thrift2 +namespace rb Apache.Hadoop.Hbase.Thrift2 +namespace py hbase +namespace perl Hbase + +include "Types.thrift" + +service HbaseClient { + + /** + * Test for the existence of columns in the table, as specified in the TGet. + * + * @return true if the specified TGet matches one or more keys, false if not + */ + bool exists( + /** the table to check on */ + 1: required Types.Bytes table, + + /** the TGet to check for */ + 2: required Types.TGet get + ) throws (1:Types.TIOError io) + + /** + * Method for getting data from a row. + * + * If the row cannot be found an empty Result is returned. + * This can be checked by the empty field of the TResult + * + * @return the result + */ + Types.TResult get( + /** the table to get from */ + 1: required Types.Bytes table, + + /** the TGet to fetch */ + 2: required Types.TGet get + ) throws (1: Types.TIOError io) + + /** + * Method for getting multiple rows. + * + * If a row cannot be found there will be a null + * value in the result list for that TGet at the + * same position. + * + * So the Results are in the same order as the TGets. + */ + list getMultiple( + /** the table to get from */ + 1: required Types.Bytes table, + + /** a list of TGets to fetch, the Result list + will have the Results at corresponding positions + or null if there was an error */ + 2: required list gets + ) throws (1: Types.TIOError io) + + /** + * Return the row that matches row exactly, + * or the one that immediately precedes it. + */ + Types.TResult getRowOrBefore( + /** the table to get from */ + 1: required Types.Bytes table, + + /** the row key to get or the one preceding it */ + 2: required Types.Bytes row, + + /** the column family to get */ + 3: required Types.Bytes family + ) throws (1: Types.TIOError io) + + /** + * Commit a TPut to a table. + */ + void put( + /** the table to put data in */ + 1: required Types.Bytes table, + + /** the TPut to put */ + 2: required Types.TPut put + ) throws (1: Types.TIOError io) + + /** + * Atomically checks if a row/family/qualifier value matches the expected + * value. If it does, it adds the TPut. + * + * @return true if the new put was executed, false otherwise + */ + bool checkAndPut( + /** to check in and put to */ + 1: required Types.Bytes table, + + /** row to check */ + 2: required Types.Bytes row, + + /** column family to check */ + 3: required Types.Bytes family, + + /** column qualifier to check */ + 4: required Types.Bytes qualifier, + + /** the expected value, if not provided the + check is for the non-existence of the + column in question */ + 5: optional Types.Bytes value, + + /** the TPut to put if the check succeeds */ + 6: required Types.TPut put + ) throws (1: Types.TIOError io) + + /** + * Commit a List of Puts to the table. + */ + void putMultiple( + /** the table to put data in */ + 1: required Types.Bytes table, + + /** a list of TPuts to commit */ + 2: required list puts + ) throws (1: Types.TIOError io) + + /** + * Deletes as specified by the TDelete. + * + * Note: "delete" is a reserved keyword and cannot be used in Thrift + * thus the inconsistent naming scheme from the other functions. + */ + void deleteSingle( + /** the table to delete from */ + 1: required Types.Bytes table, + + /** the TDelete to delete */ + 2: required Types.TDelete deleteSingle + ) throws (1: Types.TIOError io) + + /** + * Bulk commit a List of TDeletes to the table. + * + * This returns a list of TDeletes that were not + * executed. So if everything succeeds you'll + * receive an empty list. + */ + list deleteMultiple( + /** the table to delete from */ + 1: required Types.Bytes table, + + /** list of TDeletes to delete */ + 2: required list deletes + ) throws (1: Types.TIOError io) + + /** + * Atomically checks if a row/family/qualifier value matches the expected + * value. If it does, it adds the delete. + * + * @return true if the new delete was executed, false otherwise + */ + bool checkAndDelete( + /** to check in and delete from */ + 1: required Types.Bytes table, + + /** row to check */ + 2: required Types.Bytes row, + + /** column family to check */ + 3: required Types.Bytes family, + + /** column qualifier to check */ + 4: required Types.Bytes qualifier, + + /** the expected value, if not provided the + check is for the non-existence of the + column in question */ + 5: optional Types.Bytes value, + + /** the TDelete to execute if the check succeeds */ + 6: required Types.TDelete deleteSingle + ) throws (1: Types.TIOError io) + + /** + * Atomically increments a single column by a user provided amount. + */ + i64 incrementColumnValue( + /** the table to increment the value on */ + 1: required Types.Bytes table, + + /** the row where the value should be incremented */ + 2: required Types.Bytes row, + + /** the family in the row where the value should be incremented */ + 3: required Types.Bytes family, + + /** the column qualifier where the value should be incremented */ + 4: required Types.Bytes qualifier, + + /** the amount by which the value should be incremented */ + 5: optional i64 amount = 1, + + /** if this increment should be written to the WAL or not */ + 6: optional bool writeToWal = 1 + ) throws (1: Types.TIOError io) + + /** + * Get a Scanner for the provided TScan object. + * + * @return Scanner Id to be used with other scanner procedures + */ + i32 openScanner( + /** the table to get the Scanner for */ + 1: required Types.Bytes table, + + /** the scan object to get a Scanner for */ + 2: required Types.TScan scan, + ) throws (1: Types.TIOError io) + + /** + * Grabs multiple rows from a Scanner. + * + * @return Between zero and numRows TResults + */ + list getScannerRows( + /** the Id of the Scanner to return rows from. This is an Id returned from the openScanner function. */ + 1: required i32 scannerId, + + /** number of rows to return */ + 2: optional i32 numRows = 1 + ) throws ( + 1: Types.TIOError io, + + /** if the scannerId is invalid */ + 2: Types.TIllegalArgument ia + ) + + /** + * Closes the scanner. Should be called if you need to close + * the Scanner before all results are read. + * + * Exhausted scanners are closed automatically. + */ + void closeScanner( + /** the Id of the Scanner to close **/ + 1: required i32 scannerId + ) throws ( + 1: Types.TIOError io, + + /** if the scannerId is invalid */ + 2: Types.TIllegalArgument ia + ) + +} \ No newline at end of file diff --git a/src/main/resources/org/apache/hadoop/hbase/thrift2/Types.thrift b/src/main/resources/org/apache/hadoop/hbase/thrift2/Types.thrift new file mode 100644 index 0000000..00ab5d8 --- /dev/null +++ b/src/main/resources/org/apache/hadoop/hbase/thrift2/Types.thrift @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace java org.apache.hadoop.hbase.thrift2.generated +namespace cpp apache.hadoop.hbase.thrift2 +namespace rb Apache.Hadoop.Hbase.Thrift2 +namespace py hbase +namespace perl Hbase + +const string VERSION = "2.0.0" + +typedef binary Bytes + +struct TTimeRange { + 1: required i64 minStamp, + 2: required i64 maxStamp +} + +/** + * Addresses a single cell or multiple cells + * in a HBase table by column family and optionally + * a column qualifier and timestamp + */ +struct TColumn { + 1: required Bytes family, + 2: optional Bytes qualifier, + 3: optional i64 timestamp +} + +/** + * Represents a single cell and its value. + */ +struct TColumnValue { + 1: required Bytes family, + 2: required Bytes qualifier, + 3: required Bytes value, + 4: optional i64 timestamp +} + +/** + * TODO: optional bool "empty"? + */ +struct TResult { + 1: required Bytes row, + 2: required list entries +} + +/** + * Used to perform Get operations on a single row. + * + * The scope can be further narrowed down by specifying a list of + * columns or column families. + * + * To get everything for a row, instantiate a Get object with just the row to get. + * To further define the scope of what to get you can add a timestamp or time range + * with an optional maximum number of versions to return. + * + * If you specify a time range and a timestamp the range is ignored. + * Timestamps on TColumns are ignored. + * + * TODO: Filter, Locks + */ +struct TGet { + 1: required Bytes row, + 2: optional list columns, + + 3: optional i64 timestamp, + 4: optional TTimeRange timeRange, + + 5: optional i32 maxVersions, +} + +/** + * Used to perform Put operations for a single row. + * + * Add column values to this object and they'll be added. + * You can provide a default timestamp if the column values + * don't have one. If you don't provide a default timestamp + * the current time is inserted. + * + * You can also define it this Put should be written + * to the write-ahead Log (WAL) or not. It defaults to true. + */ +struct TPut { + 1: required Bytes row, + 2: required list columnValues + 3: optional i64 timestamp, + 4: optional bool writeToWal = 1 +} + +/** + * Used to perform Delete operations on a single row. + * + * The scope can be further narrowed down by specifying a list of + * columns or column families as TColumns. + * + * Specifying only a family in a TColumn will delete the whole family. + * If a timestamp is specified all versions with a timestamp less than + * or equal to this will be deleted. If no timestamp is specified the + * current time will be used. + * + * Specifying a family and a column qualifier in a TColumn will delete only + * this qualifier. If a timestamp is specified only versions equal + * to this timestamp will be deleted. If no timestamp is specified the + * most recent version will be deleted. + * + * The top level timestamp is only used if a complete row should be deleted + * (i.e. no columns are passed) and if it is specified it works the same way + * as if you had added a TColumn for every column family and this timestamp + * (i.e. all versions older than or equal in all column families will be deleted) + * + * TODO: This is missing the KeyValue.Type.DeleteColumn semantic. I could add a DeleteType or something like that + */ +struct TDelete { + 1: required Bytes row, + 2: optional list columns + 3: optional i64 timestamp +} + +/** + * TODO: Filter + */ +struct TScan { + 1: optional Bytes startRow, + 2: optional Bytes stopRow, + 3: optional list columns + 4: optional i32 caching, + 5: optional i32 maxVersions, + 6: optional TTimeRange timeRange, +} + +// +// Exceptions +// + +/** + * A TIOError exception signals that an error occurred communicating + * to the HBase master or a HBase region server. Also used to return + * more general HBase error conditions. + */ +exception TIOError { + 1: optional string message +} + +/** + * A TIllegalArgument exception indicates an illegal or invalid + * argument was passed into a procedure. + */ +exception TIllegalArgument { + 1: optional string message +} \ No newline at end of file diff --git a/test/java/org/apache/hadoop/hbase/thrift2/TestThriftServer.java b/test/java/org/apache/hadoop/hbase/thrift2/TestThriftServer.java new file mode 100644 index 0000000..99ea7e1 --- /dev/null +++ b/test/java/org/apache/hadoop/hbase/thrift2/TestThriftServer.java @@ -0,0 +1,406 @@ +/* + * Copyright 2009 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.thrift2; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.util.Bytes; +import static org.junit.Assert.*; +import org.junit.Test; +import org.junit.Before; + +/** + * Unit testing for ThriftServer.HBaseHandler, a part of the + * org.apache.hadoop.hbase.thrift2 package. + */ +public class TestThriftServer { + + // Static names for tables, columns, rows, and values + private static byte[] tableAname = Bytes.toBytes("tableA"); + private static byte[] tableBname = Bytes.toBytes("tableB"); + private static byte[] columnAname = Bytes.toBytes("columnA:"); + private static byte[] columnBname = Bytes.toBytes("columnB:"); + private static byte[] rowAname = Bytes.toBytes("rowA"); + private static byte[] rowBname = Bytes.toBytes("rowB"); + private static byte[] valueAname = Bytes.toBytes("valueA"); + private static byte[] valueBname = Bytes.toBytes("valueB"); + private static byte[] valueCname = Bytes.toBytes("valueC"); + private static byte[] valueDname = Bytes.toBytes("valueD"); + + + private HBaseTestingUtility hbaseTestingUtility; + + /** + * Runs all of the tests under a single JUnit test method. We + * consolidate all testing to one method because HBaseClusterTestCase + * is prone to OutOfMemoryExceptions when there are three or more + * JUnit test methods. + * + * @throws Exception + + public void testAll() throws Exception { + // Run all tests + doTestTableCreateDrop(); + doTestTableMutations(); + doTestTableTimestampsAndColumns(); + doTestTableScanners(); + } + */ + + @Before + public void setUp() throws Exception { + hbaseTestingUtility = new HBaseTestingUtility(); + hbaseTestingUtility.startMiniCluster(); + } + + /** + * Tests for creating, enabling, disabling, and deleting tables. Also + * tests that creating a table with an invalid column name yields an + * IllegalArgument exception. + * + * @throws Exception + */ + @Test + public void doTestTableCreateDrop() throws Exception { + ThriftServer.HBaseHandler handler = new ThriftServer.HBaseHandler(); + + // Create/enable/disable/delete tables, ensure methods act correctly + assertTrue(handler.isMasterRunning()); + + /* + assertEquals(handler.getTableNames().size(), 0); + handler.createTable(tableAname, getColumnDescriptors()); + assertEquals(handler.getTableNames().size(), 1); + assertEquals(handler.getColumnDescriptors(tableAname).size(), 2); + assertTrue(handler.isTableEnabled(tableAname)); + handler.createTable(tableBname, new ArrayList()); + assertEquals(handler.getTableNames().size(), 2); + handler.disableTable(tableBname); + assertFalse(handler.isTableEnabled(tableBname)); + handler.deleteTable(tableBname); + assertEquals(handler.getTableNames().size(), 1); + handler.disableTable(tableAname); + assertFalse(handler.isTableEnabled(tableAname)); + handler.enableTable(tableAname); + assertTrue(handler.isTableEnabled(tableAname)); + handler.disableTable(tableAname); + handler.deleteTable(tableAname); + */ + } + + /** + * Tests adding a series of Mutations and BatchMutations, including a + * delete mutation. Also tests data retrieval, and getting back multiple + * versions. + * + * @throws Exception + + public void doTestTableMutations() throws Exception { + // Setup + ThriftServer.HBaseHandler handler = new ThriftServer.HBaseHandler(); + handler.createTable(tableAname, getColumnDescriptors()); + + // Apply a few Mutations to rowA + // mutations.add(new Mutation(false, columnAname, valueAname)); + // mutations.add(new Mutation(false, columnBname, valueBname)); + handler.mutateRow(tableAname, rowAname, getMutations()); + + // Assert that the changes were made + assertTrue(Bytes.equals(valueAname, + handler.get(tableAname, rowAname, columnAname).get(0).value)); + TRowResult rowResult1 = handler.getRow(tableAname, rowAname).get(0); + assertTrue(Bytes.equals(rowAname, rowResult1.row)); + assertTrue(Bytes.equals(valueBname, + rowResult1.columns.get(columnBname).value)); + + // Apply a few BatchMutations for rowA and rowB + // rowAmutations.add(new Mutation(true, columnAname, null)); + // rowAmutations.add(new Mutation(false, columnBname, valueCname)); + // batchMutations.add(new BatchMutation(rowAname, rowAmutations)); + // Mutations to rowB + // rowBmutations.add(new Mutation(false, columnAname, valueCname)); + // rowBmutations.add(new Mutation(false, columnBname, valueDname)); + // batchMutations.add(new BatchMutation(rowBname, rowBmutations)); + handler.mutateRows(tableAname, getBatchMutations()); + + // Assert that changes were made to rowA + List cells = handler.get(tableAname, rowAname, columnAname); + assertFalse(cells.size() > 0); + assertTrue(Bytes.equals(valueCname, handler.get(tableAname, rowAname, columnBname).get(0).value)); + List versions = handler.getVer(tableAname, rowAname, columnBname, MAXVERSIONS); + assertTrue(Bytes.equals(valueCname, versions.get(0).value)); + assertTrue(Bytes.equals(valueBname, versions.get(1).value)); + + // Assert that changes were made to rowB + TRowResult rowResult2 = handler.getRow(tableAname, rowBname).get(0); + assertTrue(Bytes.equals(rowBname, rowResult2.row)); + assertTrue(Bytes.equals(valueCname, rowResult2.columns.get(columnAname).value)); + assertTrue(Bytes.equals(valueDname, rowResult2.columns.get(columnBname).value)); + + // Apply some deletes + handler.deleteAll(tableAname, rowAname, columnBname); + handler.deleteAllRow(tableAname, rowBname); + + // Assert that the deletes were applied + int size = handler.get(tableAname, rowAname, columnBname).size(); + assertEquals(0, size); + size = handler.getRow(tableAname, rowBname).size(); + assertEquals(0, size); + + // Teardown + handler.disableTable(tableAname); + handler.deleteTable(tableAname); + } + + /** + * Similar to testTableMutations(), except Mutations are applied with + * specific timestamps and data retrieval uses these timestamps to + * extract specific versions of data. + * + * @throws Exception + + public void doTestTableTimestampsAndColumns() throws Exception { + // Setup + ThriftServer.HBaseHandler handler = new ThriftServer.HBaseHandler(); + handler.createTable(tableAname, getColumnDescriptors()); + + // Apply timestamped Mutations to rowA + long time1 = System.currentTimeMillis(); + handler.mutateRowTs(tableAname, rowAname, getMutations(), time1); + + Thread.sleep(1000); + + // Apply timestamped BatchMutations for rowA and rowB + long time2 = System.currentTimeMillis(); + handler.mutateRowsTs(tableAname, getBatchMutations(), time2); + + // Apply an overlapping timestamped mutation to rowB + handler.mutateRowTs(tableAname, rowBname, getMutations(), time2); + + // the getVerTs is [inf, ts) so you need to increment one. + time1 += 1; + time2 += 2; + + // Assert that the timestamp-related methods retrieve the correct data + assertEquals(2, handler.getVerTs(tableAname, rowAname, columnBname, time2, + MAXVERSIONS).size()); + assertEquals(1, handler.getVerTs(tableAname, rowAname, columnBname, time1, + MAXVERSIONS).size()); + + TRowResult rowResult1 = handler.getRowTs(tableAname, rowAname, time1).get(0); + TRowResult rowResult2 = handler.getRowTs(tableAname, rowAname, time2).get(0); + // columnA was completely deleted + //assertTrue(Bytes.equals(rowResult1.columns.get(columnAname).value, valueAname)); + assertTrue(Bytes.equals(rowResult1.columns.get(columnBname).value, valueBname)); + assertTrue(Bytes.equals(rowResult2.columns.get(columnBname).value, valueCname)); + + // ColumnAname has been deleted, and will never be visible even with a getRowTs() + assertFalse(rowResult2.columns.containsKey(columnAname)); + + List columns = new ArrayList(); + columns.add(columnBname); + + rowResult1 = handler.getRowWithColumns(tableAname, rowAname, columns).get(0); + assertTrue(Bytes.equals(rowResult1.columns.get(columnBname).value, valueCname)); + assertFalse(rowResult1.columns.containsKey(columnAname)); + + rowResult1 = handler.getRowWithColumnsTs(tableAname, rowAname, columns, time1).get(0); + assertTrue(Bytes.equals(rowResult1.columns.get(columnBname).value, valueBname)); + assertFalse(rowResult1.columns.containsKey(columnAname)); + + // Apply some timestamped deletes + // this actually deletes _everything_. + // nukes everything in columnB: forever. + handler.deleteAllTs(tableAname, rowAname, columnBname, time1); + handler.deleteAllRowTs(tableAname, rowBname, time2); + + // Assert that the timestamp-related methods retrieve the correct data + int size = handler.getVerTs(tableAname, rowAname, columnBname, time1, MAXVERSIONS).size(); + assertEquals(0, size); + + size = handler.getVerTs(tableAname, rowAname, columnBname, time2, MAXVERSIONS).size(); + assertEquals(1, size); + + // should be available.... + assertTrue(Bytes.equals(handler.get(tableAname, rowAname, columnBname).get(0).value, valueCname)); + + assertEquals(0, handler.getRow(tableAname, rowBname).size()); + + // Teardown + handler.disableTable(tableAname); + handler.deleteTable(tableAname); + } + + /** + * Tests the four different scanner-opening methods (with and without + * a stoprow, with and without a timestamp). + * + * @throws Exception + + public void doTestTableScanners() throws Exception { + // Setup + ThriftServer.HBaseHandler handler = new ThriftServer.HBaseHandler(); + handler.createTable(tableAname, getColumnDescriptors()); + + // Apply timestamped Mutations to rowA + long time1 = System.currentTimeMillis(); + handler.mutateRowTs(tableAname, rowAname, getMutations(), time1); + + // Sleep to assure that 'time1' and 'time2' will be different even with a + // coarse grained system timer. + Thread.sleep(1000); + + // Apply timestamped BatchMutations for rowA and rowB + long time2 = System.currentTimeMillis(); + handler.mutateRowsTs(tableAname, getBatchMutations(), time2); + + time1 += 1; + + // Test a scanner on all rows and all columns, no timestamp + int scanner1 = handler.scannerOpen(tableAname, rowAname, getColumnList(true, true)); + TRowResult rowResult1a = handler.scannerGet(scanner1).get(0); + assertTrue(Bytes.equals(rowResult1a.row, rowAname)); + // This used to be '1'. I don't know why when we are asking for two columns + // and when the mutations above would seem to add two columns to the row. + // -- St.Ack 05/12/2009 + assertEquals(rowResult1a.columns.size(), 1); + assertTrue(Bytes.equals(rowResult1a.columns.get(columnBname).value, valueCname)); + + TRowResult rowResult1b = handler.scannerGet(scanner1).get(0); + assertTrue(Bytes.equals(rowResult1b.row, rowBname)); + assertEquals(rowResult1b.columns.size(), 2); + assertTrue(Bytes.equals(rowResult1b.columns.get(columnAname).value, valueCname)); + assertTrue(Bytes.equals(rowResult1b.columns.get(columnBname).value, valueDname)); + closeScanner(scanner1, handler); + + // Test a scanner on all rows and all columns, with timestamp + int scanner2 = handler.scannerOpenTs(tableAname, rowAname, getColumnList(true, true), time1); + TRowResult rowResult2a = handler.scannerGet(scanner2).get(0); + assertEquals(rowResult2a.columns.size(), 1); + // column A deleted, does not exist. + //assertTrue(Bytes.equals(rowResult2a.columns.get(columnAname).value, valueAname)); + assertTrue(Bytes.equals(rowResult2a.columns.get(columnBname).value, valueBname)); + closeScanner(scanner2, handler); + + // Test a scanner on the first row and first column only, no timestamp + int scanner3 = handler.scannerOpenWithStop(tableAname, rowAname, rowBname, + getColumnList(true, false)); + closeScanner(scanner3, handler); + + // Test a scanner on the first row and second column only, with timestamp + int scanner4 = handler.scannerOpenWithStopTs(tableAname, rowAname, rowBname, + getColumnList(false, true), time1); + TRowResult rowResult4a = handler.scannerGet(scanner4).get(0); + assertEquals(rowResult4a.columns.size(), 1); + assertTrue(Bytes.equals(rowResult4a.columns.get(columnBname).value, valueBname)); + + // Teardown + handler.disableTable(tableAname); + handler.deleteTable(tableAname); + } + + /** + * + * @return a List of ColumnDescriptors for use in creating a table. Has one + * default ColumnDescriptor and one ColumnDescriptor with fewer versions + + private List getColumnDescriptors() { + ArrayList cDescriptors = new ArrayList(); + + // A default ColumnDescriptor + ColumnDescriptor cDescA = new ColumnDescriptor(); + cDescA.name = columnAname; + cDescriptors.add(cDescA); + + // A slightly customized ColumnDescriptor (only 2 versions) + ColumnDescriptor cDescB = new ColumnDescriptor(columnBname, 2, "NONE", + false, "NONE", 0, 0, false, -1); + cDescriptors.add(cDescB); + + return cDescriptors; + } + + /** + * + * @param includeA whether or not to include columnA + * @param includeB whether or not to include columnB + * @return a List of column names for use in retrieving a scanner + + private List getColumnList(boolean includeA, boolean includeB) { + List columnList = new ArrayList(); + if (includeA) columnList.add(columnAname); + if (includeB) columnList.add(columnBname); + return columnList; + } + + /** + * + * @return a List of Mutations for a row, with columnA having valueA + * and columnB having valueB + + private List getMutations() { + List mutations = new ArrayList(); + mutations.add(new Mutation(false, columnAname, valueAname)); + mutations.add(new Mutation(false, columnBname, valueBname)); + return mutations; + } + + /** + * + * @return a List of BatchMutations with the following effects: + * (rowA, columnA): delete + * (rowA, columnB): place valueC + * (rowB, columnA): place valueC + * (rowB, columnB): place valueD + + private List getBatchMutations() { + List batchMutations = new ArrayList(); + + // Mutations to rowA. You can't mix delete and put anymore. + List rowAmutations = new ArrayList(); + rowAmutations.add(new Mutation(true, columnAname, null)); + batchMutations.add(new BatchMutation(rowAname, rowAmutations)); + + rowAmutations = new ArrayList(); + rowAmutations.add(new Mutation(false, columnBname, valueCname)); + batchMutations.add(new BatchMutation(rowAname, rowAmutations)); + + // Mutations to rowB + List rowBmutations = new ArrayList(); + rowBmutations.add(new Mutation(false, columnAname, valueCname)); + rowBmutations.add(new Mutation(false, columnBname, valueDname)); + batchMutations.add(new BatchMutation(rowBname, rowBmutations)); + + return batchMutations; + } + + /** + * Asserts that the passed scanner is exhausted, and then closes + * the scanner. + * + * @param scannerId the scanner to close + * @param handler the HBaseHandler interfacing to HBase + * @throws Exception + + private void closeScanner(int scannerId, ThriftServer.HBaseHandler handler) throws Exception { + handler.scannerGet(scannerId); + handler.scannerClose(scannerId); + } + */ +}