diff -u hbase/hbase-server/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java hbase-thrift2/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java --- hbase/hbase-server/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java 2013-02-07 16:01:00.000000000 +0100 +++ hbase-thrift2/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java 2013-02-07 16:48:44.000000000 +0100 @@ -114,7 +114,7 @@ return result; } } - + private static long now() { return System.nanoTime(); } @@ -136,11 +136,19 @@ } private TIOError getTIOError(IOException e) { + LOG.error("IOException caught:", e); TIOError err = new TIOError(); err.setMessage(e.getMessage()); return err; } + private TIllegalArgument getTIllegalArgument(IllegalArgumentException e) { + LOG.error("IllegalArgumentException caught:", e); + TIllegalArgument err = new TIllegalArgument(); + err.setMessage(e.getMessage()); + return err; + } + /** * Assigns a unique ID to the scanner and adds the mapping to an internal HashMap. * @@ -186,12 +194,14 @@ } @Override - public TResult get(ByteBuffer table, TGet get) throws TIOError, TException { + public TResult get(ByteBuffer table, TGet get) throws TIOError, TIllegalArgument, TException { HTableInterface htable = getTable(table.array()); try { return resultFromHBase(htable.get(getFromThrift(get))); } catch (IOException e) { throw getTIOError(e); + } catch (IllegalArgumentException e) { + throw getTIllegalArgument(e); } finally { closeTable(htable); } @@ -302,14 +312,16 @@ } @Override - public int openScanner(ByteBuffer table, TScan scan) throws TIOError, TException { + public int openScanner(ByteBuffer table, TScan scan) throws TIOError, TIllegalArgument, TException { HTableInterface htable = getTable(table.array()); ResultScanner resultScanner = null; try { resultScanner = htable.getScanner(scanFromThrift(scan)); } catch (IOException e) { throw getTIOError(e); - } finally { + } catch (IllegalArgumentException e) { + throw getTIllegalArgument(e); + }finally { closeTable(htable); } return addScanner(resultScanner); @@ -325,7 +337,11 @@ } try { - return resultsFromHBase(scanner.next(numRows)); + List results = resultsFromHBase(scanner.next(numRows)); + if(results.size() < numRows) { + removeScanner(scannerId); + } + return results; } catch (IOException e) { throw getTIOError(e); } @@ -341,3 +357,4 @@ } } + diff -u hbase/hbase-server/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftUtilities.java hbase-thrift2/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftUtilities.java --- hbase/hbase-server/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftUtilities.java 2013-02-07 16:01:00.000000000 +0100 +++ hbase-thrift2/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftUtilities.java 2013-02-07 16:51:08.000000000 +0100 @@ -22,9 +22,15 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.thrift2.generated.*; +import com.google.common.base.Function; +import com.google.common.collect.Collections2; + import java.io.IOException; +import java.lang.reflect.Method; import java.nio.ByteBuffer; import java.util.*; @@ -59,6 +65,10 @@ if (in.isSetMaxVersions()) { out.setMaxVersions(in.getMaxVersions()); } + + if(in.isSetFilters()) { + out.setFilter(filterListFromThrift(in.getFilters())); + } if (!in.isSetColumns()) { return out; @@ -282,6 +292,54 @@ return out; } + public static Filter filterListFromThrift(TFilterList filterList) { + + // Transform a list of TFilters into a list of Filters + List filters = new ArrayList( + Collections2.transform(filterList.getFilters(), new Function() { + public Filter apply(TFilter tfilter) { + return filterFromThrift(tfilter); + } + }) + ); + + TFilterListOperator tOperator = filterList.getOperator(); + FilterList.Operator operator = null; + if (tOperator == TFilterListOperator.MUST_PASS_ALL) { + operator = FilterList.Operator.MUST_PASS_ALL; + } else if (tOperator == TFilterListOperator.MUST_PASS_ONE) { + operator = FilterList.Operator.MUST_PASS_ONE; + } else { + throw new IllegalArgumentException("Operator is not supported. Available operators: MUST_PASS_ALL, MUST_PASS_ONE"); + } + + return new FilterList(operator, filters); + } + + public static Filter filterFromThrift(TFilter filter) { + // Transform a list of filter args (ByteBuffer objects) into a list of byte[] + ArrayList args = new ArrayList( + Collections2.transform(filter.getArguments(), new Function() { + public byte[] apply(ByteBuffer buff) { + return buff.array(); + } + }) + ); + return initializeFilter(filter.getClassName(), args); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + private static Filter initializeFilter(String filterClassName, ArrayList args) { + try { + Class c = Class.forName(filterClassName); + Class[] argTypes = new Class [] {ArrayList.class}; + Method m = c.getDeclaredMethod("createFilterFromArguments", argTypes); + return (Filter) m.invoke(null, args); + } catch (Exception e) { + throw new IllegalArgumentException("Can't initialize a filter " + filterClassName, e); + } + } + public static Scan scanFromThrift(TScan in) throws IOException { Scan out = new Scan(); @@ -295,6 +353,10 @@ out.setMaxVersions(in.getMaxVersions()); } + if (in.isSetFilters()) { + out.setFilter(filterListFromThrift(in.getFilters())); + } + if (in.isSetColumns()) { for (TColumn column : in.getColumns()) { if (column.isSetQualifier()) {