Index: src/test/org/apache/hadoop/hbase/client/TestParallelClient.java =================================================================== --- src/test/org/apache/hadoop/hbase/client/TestParallelClient.java Thu Dec 17 17:46:24 EST 2009 +++ src/test/org/apache/hadoop/hbase/client/TestParallelClient.java Thu Dec 17 17:46:24 EST 2009 @@ -0,0 +1,3006 @@ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; + +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.util.Bytes; +import junit.framework.Assert; +import org.apache.hadoop.hbase.client.ParallelHTable; +import org.apache.hadoop.hbase.HBaseClusterTestCase; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.HConstants; + +public class TestParallelClient extends HBaseClusterTestCase { + + private static byte [] ROW = Bytes.toBytes("testRow"); + private static byte [] FAMILY = Bytes.toBytes("testFamily"); + private static byte [] QUALIFIER = Bytes.toBytes("testQualifier"); + private static byte [] VALUE = Bytes.toBytes("testValue"); + + private static byte [] EMPTY = new byte[0]; + + /** + * Constructor does nothing special, start cluster. + */ + public TestParallelClient() { + super(); + } + + public void XtestSuperSimple() throws Exception { + byte [] TABLE = Bytes.toBytes("testSuperSimple"); + HTable ht = createTable(TABLE, FAMILY); + Put put = new Put(ROW); + put.add(FAMILY, QUALIFIER, VALUE); + ht.put(put); + Scan scan = new Scan(); + scan.addColumn(FAMILY, TABLE); + ResultScanner scanner = ht.getScanner(scan); + Result result = scanner.next(); + assertTrue("Expected null result", result == null); + scanner.close(); + System.out.println("Done."); + } + + /** + * Test simple table and non-existent row cases. + */ + public void testSimpleMissing() throws Exception { + + byte [] TABLE = Bytes.toBytes("testSimpleMissing"); + + HTable ht = createTable(TABLE, FAMILY); + + byte [][] ROWS = makeN(ROW, 4); + + // Try to get a row on an empty table + + Get get = new Get(ROWS[0]); + Result result = ht.get(get); + assertEmptyResult(result); + + get = new Get(ROWS[0]); + get.addFamily(FAMILY); + result = ht.get(get); + assertEmptyResult(result); + + get = new Get(ROWS[0]); + get.addColumn(FAMILY, QUALIFIER); + result = ht.get(get); + assertEmptyResult(result); + + Scan scan = new Scan(); + result = getSingleScanResult(ht, scan); + assertNullResult(result); + + + scan = new Scan(ROWS[0]); + result = getSingleScanResult(ht, scan); + assertNullResult(result); + + scan = new Scan(ROWS[0],ROWS[1]); + result = getSingleScanResult(ht, scan); + assertNullResult(result); + + scan = new Scan(); + scan.addFamily(FAMILY); + result = getSingleScanResult(ht, scan); + assertNullResult(result); + + scan = new Scan(); + scan.addColumn(FAMILY, QUALIFIER); + result = getSingleScanResult(ht, scan); + assertNullResult(result); + + // Insert a row + + Put put = new Put(ROWS[2]); + put.add(FAMILY, QUALIFIER, VALUE); + ht.put(put); + + // Try to get empty rows around it + + get = new Get(ROWS[1]); + result = ht.get(get); + assertEmptyResult(result); + + get = new Get(ROWS[0]); + get.addFamily(FAMILY); + result = ht.get(get); + assertEmptyResult(result); + + get = new Get(ROWS[3]); + get.addColumn(FAMILY, QUALIFIER); + result = ht.get(get); + assertEmptyResult(result); + + // Try to scan empty rows around it + + scan = new Scan(ROWS[3]); + result = getSingleScanResult(ht, scan); + assertNullResult(result); + + scan = new Scan(ROWS[0],ROWS[2]); + result = getSingleScanResult(ht, scan); + assertNullResult(result); + + // Make sure we can actually get the row + + get = new Get(ROWS[2]); + result = ht.get(get); + assertSingleResult(result, ROWS[2], FAMILY, QUALIFIER, VALUE); + + get = new Get(ROWS[2]); + get.addFamily(FAMILY); + result = ht.get(get); + assertSingleResult(result, ROWS[2], FAMILY, QUALIFIER, VALUE); + + get = new Get(ROWS[2]); + get.addColumn(FAMILY, QUALIFIER); + result = ht.get(get); + assertSingleResult(result, ROWS[2], FAMILY, QUALIFIER, VALUE); + + // Make sure we can scan the row + + scan = new Scan(); + result = getSingleScanResult(ht, scan); + assertSingleResult(result, ROWS[2], FAMILY, QUALIFIER, VALUE); + + scan = new Scan(ROWS[0],ROWS[3]); + result = getSingleScanResult(ht, scan); + assertSingleResult(result, ROWS[2], FAMILY, QUALIFIER, VALUE); + + scan = new Scan(ROWS[2],ROWS[3]); + result = getSingleScanResult(ht, scan); + assertSingleResult(result, ROWS[2], FAMILY, QUALIFIER, VALUE); + } + + /** + * Test basic puts, gets, scans, and deletes for a single row + * in a multiple family table. + */ + public void testSingleRowMultipleFamily() throws Exception { + + byte [] TABLE = Bytes.toBytes("testSingleRowMultipleFamily"); + + byte [][] ROWS = makeN(ROW, 3); + byte [][] FAMILIES = makeN(FAMILY, 10); + byte [][] QUALIFIERS = makeN(QUALIFIER, 10); + byte [][] VALUES = makeN(VALUE, 10); + + HTable ht = createTable(TABLE, FAMILIES); + + Get get; + Scan scan; + Delete delete; + Put put; + Result result; + + //////////////////////////////////////////////////////////////////////////// + // Insert one column to one family + //////////////////////////////////////////////////////////////////////////// + + put = new Put(ROWS[0]); + put.add(FAMILIES[4], QUALIFIERS[0], VALUES[0]); + ht.put(put); + + // Get the single column + getVerifySingleColumn(ht, ROWS, 0, FAMILIES, 4, QUALIFIERS, 0, VALUES, 0); + + // Scan the single column + scanVerifySingleColumn(ht, ROWS, 0, FAMILIES, 4, QUALIFIERS, 0, VALUES, 0); + + // Get empty results around inserted column + getVerifySingleEmpty(ht, ROWS, 0, FAMILIES, 4, QUALIFIERS, 0); + + // Scan empty results around inserted column + scanVerifySingleEmpty(ht, ROWS, 0, FAMILIES, 4, QUALIFIERS, 0); + + //////////////////////////////////////////////////////////////////////////// + // Flush memstore and run same tests from storefiles + //////////////////////////////////////////////////////////////////////////// + + flushMemStore(TABLE); + + // Redo get and scan tests from storefile + + getVerifySingleColumn(ht, ROWS, 0, FAMILIES, 4, QUALIFIERS, 0, VALUES, 0); + scanVerifySingleColumn(ht, ROWS, 0, FAMILIES, 4, QUALIFIERS, 0, VALUES, 0); + getVerifySingleEmpty(ht, ROWS, 0, FAMILIES, 4, QUALIFIERS, 0); + scanVerifySingleEmpty(ht, ROWS, 0, FAMILIES, 4, QUALIFIERS, 0); + + //////////////////////////////////////////////////////////////////////////// + // Now, Test reading from memstore and storefiles at once + //////////////////////////////////////////////////////////////////////////// + + // Insert multiple columns to two other families + + put = new Put(ROWS[0]); + put.add(FAMILIES[2], QUALIFIERS[2], VALUES[2]); + put.add(FAMILIES[2], QUALIFIERS[4], VALUES[4]); + put.add(FAMILIES[4], QUALIFIERS[4], VALUES[4]); + put.add(FAMILIES[6], QUALIFIERS[6], VALUES[6]); + put.add(FAMILIES[6], QUALIFIERS[7], VALUES[7]); + put.add(FAMILIES[7], QUALIFIERS[7], VALUES[7]); + put.add(FAMILIES[9], QUALIFIERS[0], VALUES[0]); + ht.put(put); + + // Get multiple columns across multiple families and get empties around it + singleRowGetTest(ht, ROWS, FAMILIES, QUALIFIERS, VALUES); + + // Scan multiple columns across multiple families and scan empties around it + singleRowScanTest(ht, ROWS, FAMILIES, QUALIFIERS, VALUES); + + //////////////////////////////////////////////////////////////////////////// + // Flush the table again + //////////////////////////////////////////////////////////////////////////// + + flushMemStore(TABLE); + + // Redo tests again + + singleRowGetTest(ht, ROWS, FAMILIES, QUALIFIERS, VALUES); + singleRowScanTest(ht, ROWS, FAMILIES, QUALIFIERS, VALUES); + + // Insert more data to memstore + + put = new Put(ROWS[0]); + put.add(FAMILIES[6], QUALIFIERS[5], VALUES[5]); + put.add(FAMILIES[6], QUALIFIERS[8], VALUES[8]); + put.add(FAMILIES[6], QUALIFIERS[9], VALUES[9]); + put.add(FAMILIES[4], QUALIFIERS[3], VALUES[3]); + ht.put(put); + + //////////////////////////////////////////////////////////////////////////// + // Delete a storefile column + //////////////////////////////////////////////////////////////////////////// + delete = new Delete(ROWS[0]); + delete.deleteColumns(FAMILIES[6], QUALIFIERS[7]); + ht.delete(delete); + + // Try to get deleted column + + get = new Get(ROWS[0]); + get.addColumn(FAMILIES[6], QUALIFIERS[7]); + result = ht.get(get); + assertEmptyResult(result); + + // Try to scan deleted column + + scan = new Scan(); + scan.addColumn(FAMILIES[6], QUALIFIERS[7]); + result = getSingleScanResult(ht, scan); + assertNullResult(result); + + // Make sure we can still get a column before it and after it + + get = new Get(ROWS[0]); + get.addColumn(FAMILIES[6], QUALIFIERS[6]); + result = ht.get(get); + assertSingleResult(result, ROWS[0], FAMILIES[6], QUALIFIERS[6], VALUES[6]); + + get = new Get(ROWS[0]); + get.addColumn(FAMILIES[6], QUALIFIERS[8]); + result = ht.get(get); + assertSingleResult(result, ROWS[0], FAMILIES[6], QUALIFIERS[8], VALUES[8]); + + // Make sure we can still scan a column before it and after it + + scan = new Scan(); + scan.addColumn(FAMILIES[6], QUALIFIERS[6]); + result = getSingleScanResult(ht, scan); + assertSingleResult(result, ROWS[0], FAMILIES[6], QUALIFIERS[6], VALUES[6]); + + scan = new Scan(); + scan.addColumn(FAMILIES[6], QUALIFIERS[8]); + result = getSingleScanResult(ht, scan); + assertSingleResult(result, ROWS[0], FAMILIES[6], QUALIFIERS[8], VALUES[8]); + + //////////////////////////////////////////////////////////////////////////// + // Delete a memstore column + //////////////////////////////////////////////////////////////////////////// + delete = new Delete(ROWS[0]); + delete.deleteColumns(FAMILIES[6], QUALIFIERS[8]); + ht.delete(delete); + + // Try to get deleted column + + get = new Get(ROWS[0]); + get.addColumn(FAMILIES[6], QUALIFIERS[8]); + result = ht.get(get); + assertEmptyResult(result); + + // Try to scan deleted column + + scan = new Scan(); + scan.addColumn(FAMILIES[6], QUALIFIERS[8]); + result = getSingleScanResult(ht, scan); + assertNullResult(result); + + // Make sure we can still get a column before it and after it + + get = new Get(ROWS[0]); + get.addColumn(FAMILIES[6], QUALIFIERS[6]); + result = ht.get(get); + assertSingleResult(result, ROWS[0], FAMILIES[6], QUALIFIERS[6], VALUES[6]); + + get = new Get(ROWS[0]); + get.addColumn(FAMILIES[6], QUALIFIERS[9]); + result = ht.get(get); + assertSingleResult(result, ROWS[0], FAMILIES[6], QUALIFIERS[9], VALUES[9]); + + // Make sure we can still scan a column before it and after it + + scan = new Scan(); + scan.addColumn(FAMILIES[6], QUALIFIERS[6]); + result = getSingleScanResult(ht, scan); + assertSingleResult(result, ROWS[0], FAMILIES[6], QUALIFIERS[6], VALUES[6]); + + scan = new Scan(); + scan.addColumn(FAMILIES[6], QUALIFIERS[9]); + result = getSingleScanResult(ht, scan); + assertSingleResult(result, ROWS[0], FAMILIES[6], QUALIFIERS[9], VALUES[9]); + + //////////////////////////////////////////////////////////////////////////// + // Delete joint storefile/memstore family + //////////////////////////////////////////////////////////////////////////// + + delete = new Delete(ROWS[0]); + delete.deleteFamily(FAMILIES[4]); + ht.delete(delete); + + // Try to get storefile column in deleted family + + get = new Get(ROWS[0]); + get.addColumn(FAMILIES[4], QUALIFIERS[4]); + result = ht.get(get); + assertEmptyResult(result); + + // Try to get memstore column in deleted family + get = new Get(ROWS[0]); + get.addColumn(FAMILIES[4], QUALIFIERS[3]); + result = ht.get(get); + assertEmptyResult(result); + + // Try to get deleted family + get = new Get(ROWS[0]); + get.addFamily(FAMILIES[4]); + result = ht.get(get); + assertEmptyResult(result); + + // Try to scan storefile column in deleted family + + scan = new Scan(); + scan.addColumn(FAMILIES[4], QUALIFIERS[4]); + result = getSingleScanResult(ht, scan); + assertNullResult(result); + + // Try to scan memstore column in deleted family + scan = new Scan(); + scan.addColumn(FAMILIES[4], QUALIFIERS[3]); + result = getSingleScanResult(ht, scan); + assertNullResult(result); + + // Try to scan deleted family + scan = new Scan(); + scan.addFamily(FAMILIES[4]); + result = getSingleScanResult(ht, scan); + assertNullResult(result); + + // Make sure we can still get another family + + get = new Get(ROWS[0]); + get.addColumn(FAMILIES[2], QUALIFIERS[2]); + result = ht.get(get); + assertSingleResult(result, ROWS[0], FAMILIES[2], QUALIFIERS[2], VALUES[2]); + + get = new Get(ROWS[0]); + get.addColumn(FAMILIES[6], QUALIFIERS[9]); + result = ht.get(get); + assertSingleResult(result, ROWS[0], FAMILIES[6], QUALIFIERS[9], VALUES[9]); + + // Make sure we can still scan another family + + scan = new Scan(); + scan.addColumn(FAMILIES[6], QUALIFIERS[6]); + result = getSingleScanResult(ht, scan); + assertSingleResult(result, ROWS[0], FAMILIES[6], QUALIFIERS[6], VALUES[6]); + + scan = new Scan(); + scan.addColumn(FAMILIES[6], QUALIFIERS[9]); + result = getSingleScanResult(ht, scan); + assertSingleResult(result, ROWS[0], FAMILIES[6], QUALIFIERS[9], VALUES[9]); + + //////////////////////////////////////////////////////////////////////////// + // Flush everything and rerun delete tests + //////////////////////////////////////////////////////////////////////////// + + flushMemStore(TABLE); + + // Try to get storefile column in deleted family + + get = new Get(ROWS[0]); + get.addColumn(FAMILIES[4], QUALIFIERS[4]); + result = ht.get(get); + assertEmptyResult(result); + + // Try to get memstore column in deleted family + get = new Get(ROWS[0]); + get.addColumn(FAMILIES[4], QUALIFIERS[3]); + result = ht.get(get); + assertEmptyResult(result); + + // Try to get deleted family + get = new Get(ROWS[0]); + get.addFamily(FAMILIES[4]); + result = ht.get(get); + assertEmptyResult(result); + + // Try to scan storefile column in deleted family + + scan = new Scan(); + scan.addColumn(FAMILIES[4], QUALIFIERS[4]); + result = getSingleScanResult(ht, scan); + assertNullResult(result); + + // Try to scan memstore column in deleted family + scan = new Scan(); + scan.addColumn(FAMILIES[4], QUALIFIERS[3]); + result = getSingleScanResult(ht, scan); + assertNullResult(result); + + // Try to scan deleted family + scan = new Scan(); + scan.addFamily(FAMILIES[4]); + result = getSingleScanResult(ht, scan); + assertNullResult(result); + + // Make sure we can still get another family + + get = new Get(ROWS[0]); + get.addColumn(FAMILIES[2], QUALIFIERS[2]); + result = ht.get(get); + assertSingleResult(result, ROWS[0], FAMILIES[2], QUALIFIERS[2], VALUES[2]); + + get = new Get(ROWS[0]); + get.addColumn(FAMILIES[6], QUALIFIERS[9]); + result = ht.get(get); + assertSingleResult(result, ROWS[0], FAMILIES[6], QUALIFIERS[9], VALUES[9]); + + // Make sure we can still scan another family + + scan = new Scan(); + scan.addColumn(FAMILIES[6], QUALIFIERS[6]); + result = getSingleScanResult(ht, scan); + assertSingleResult(result, ROWS[0], FAMILIES[6], QUALIFIERS[6], VALUES[6]); + + scan = new Scan(); + scan.addColumn(FAMILIES[6], QUALIFIERS[9]); + result = getSingleScanResult(ht, scan); + assertSingleResult(result, ROWS[0], FAMILIES[6], QUALIFIERS[9], VALUES[9]); + + } + + @SuppressWarnings("unused") + public void testNull() throws Exception { + + byte [] TABLE = Bytes.toBytes("testNull"); + + // Null table name (should NOT work) + try { + HTable htFail = createTable(null, FAMILY); + assertTrue("Creating a table with null name passed, should have failed", + false); + } catch(Exception e) {} + + // Null family (should NOT work) + try { + HTable htFail = createTable(TABLE, (byte[])null); + assertTrue("Creating a table with a null family passed, should fail", + false); + } catch(Exception e) {} + + HTable ht = createTable(TABLE, FAMILY); + + // Null row (should NOT work) + try { + Put put = new Put((byte[])null); + put.add(FAMILY, QUALIFIER, VALUE); + ht.put(put); + assertTrue("Inserting a null row worked, should throw exception", + false); + } catch(Exception e) {} + + // Null qualifier (should work) + try { + + Put put = new Put(ROW); + put.add(FAMILY, null, VALUE); + ht.put(put); + + getTestNull(ht, ROW, FAMILY, VALUE); + + scanTestNull(ht, ROW, FAMILY, VALUE); + + Delete delete = new Delete(ROW); + delete.deleteColumns(FAMILY, null); + ht.delete(delete); + + Get get = new Get(ROW); + Result result = ht.get(get); + assertEmptyResult(result); + + } catch(Exception e) { + e.printStackTrace(); + assertTrue("Using a row with null qualifier threw exception, should " + + "pass", false); + } + + // Use a new table + + byte [] TABLE2 = Bytes.toBytes("testNull2"); + ht = createTable(TABLE2, FAMILY); + + // Empty qualifier, byte[0] instead of null (should work) + try { + + Put put = new Put(ROW); + put.add(FAMILY, EMPTY, VALUE); + ht.put(put); + + getTestNull(ht, ROW, FAMILY, VALUE); + + scanTestNull(ht, ROW, FAMILY, VALUE); + + // Flush and try again + + flushMemStore(TABLE2); + + getTestNull(ht, ROW, FAMILY, VALUE); + + scanTestNull(ht, ROW, FAMILY, VALUE); + + Delete delete = new Delete(ROW); + delete.deleteColumns(FAMILY, EMPTY); + ht.delete(delete); + + Get get = new Get(ROW); + Result result = ht.get(get); + assertEmptyResult(result); + + } catch(Exception e) { + e.printStackTrace(); + assertTrue("Using a row with null qualifier threw exception, should " + + "pass", false); + } + + // Null value + try { + + Put put = new Put(ROW); + put.add(FAMILY, QUALIFIER, null); + ht.put(put); + + Get get = new Get(ROW); + get.addColumn(FAMILY, QUALIFIER); + Result result = ht.get(get); + assertSingleResult(result, ROW, FAMILY, QUALIFIER, null); + + Scan scan = new Scan(); + scan.addColumn(FAMILY, QUALIFIER); + result = getSingleScanResult(ht, scan); + assertSingleResult(result, ROW, FAMILY, QUALIFIER, null); + + Delete delete = new Delete(ROW); + delete.deleteColumns(FAMILY, QUALIFIER); + ht.delete(delete); + + get = new Get(ROW); + result = ht.get(get); + assertEmptyResult(result); + + } catch(Exception e) { + e.printStackTrace(); + assertTrue("Null values should be allowed, but threw exception", + false); + } + + } + + public void testVersions() throws Exception { + + byte [] TABLE = Bytes.toBytes("testSimpleVersions"); + + long [] STAMPS = makeStamps(20); + byte [][] VALUES = makeNAscii(VALUE, 20); + + HTable ht = createTable(TABLE, FAMILY, 10); + + // Insert 4 versions of same column + Put put = new Put(ROW); + put.add(FAMILY, QUALIFIER, STAMPS[1], VALUES[1]); + put.add(FAMILY, QUALIFIER, STAMPS[2], VALUES[2]); + put.add(FAMILY, QUALIFIER, STAMPS[4], VALUES[4]); + put.add(FAMILY, QUALIFIER, STAMPS[5], VALUES[5]); + ht.put(put); + + // Verify we can get each one properly + getVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[1], VALUES[1]); + getVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[2], VALUES[2]); + getVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[4], VALUES[4]); + getVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[5], VALUES[5]); + scanVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[1], VALUES[1]); + scanVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[2], VALUES[2]); + scanVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[4], VALUES[4]); + scanVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[5], VALUES[5]); + + // Verify we don't accidentally get others + getVersionAndVerifyMissing(ht, ROW, FAMILY, QUALIFIER, STAMPS[0]); + getVersionAndVerifyMissing(ht, ROW, FAMILY, QUALIFIER, STAMPS[3]); + getVersionAndVerifyMissing(ht, ROW, FAMILY, QUALIFIER, STAMPS[6]); + scanVersionAndVerifyMissing(ht, ROW, FAMILY, QUALIFIER, STAMPS[0]); + scanVersionAndVerifyMissing(ht, ROW, FAMILY, QUALIFIER, STAMPS[3]); + scanVersionAndVerifyMissing(ht, ROW, FAMILY, QUALIFIER, STAMPS[6]); + + // Ensure maxVersions in query is respected + Get get = new Get(ROW); + get.addColumn(FAMILY, QUALIFIER); + get.setMaxVersions(2); + Result result = ht.get(get); + assertNResult(result, ROW, FAMILY, QUALIFIER, + new long [] {STAMPS[4], STAMPS[5]}, + new byte[][] {VALUES[4], VALUES[5]}, + 0, 1); + + Scan scan = new Scan(ROW); + scan.addColumn(FAMILY, QUALIFIER); + scan.setMaxVersions(2); + result = getSingleScanResult(ht, scan); + assertNResult(result, ROW, FAMILY, QUALIFIER, + new long [] {STAMPS[4], STAMPS[5]}, + new byte[][] {VALUES[4], VALUES[5]}, + 0, 1); + + // Flush and redo + + flushMemStore(TABLE); + + // Verify we can get each one properly + getVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[1], VALUES[1]); + getVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[2], VALUES[2]); + getVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[4], VALUES[4]); + getVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[5], VALUES[5]); + scanVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[1], VALUES[1]); + scanVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[2], VALUES[2]); + scanVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[4], VALUES[4]); + scanVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[5], VALUES[5]); + + // Verify we don't accidentally get others + getVersionAndVerifyMissing(ht, ROW, FAMILY, QUALIFIER, STAMPS[0]); + getVersionAndVerifyMissing(ht, ROW, FAMILY, QUALIFIER, STAMPS[3]); + getVersionAndVerifyMissing(ht, ROW, FAMILY, QUALIFIER, STAMPS[6]); + scanVersionAndVerifyMissing(ht, ROW, FAMILY, QUALIFIER, STAMPS[0]); + scanVersionAndVerifyMissing(ht, ROW, FAMILY, QUALIFIER, STAMPS[3]); + scanVersionAndVerifyMissing(ht, ROW, FAMILY, QUALIFIER, STAMPS[6]); + + // Ensure maxVersions in query is respected + get = new Get(ROW); + get.addColumn(FAMILY, QUALIFIER); + get.setMaxVersions(2); + result = ht.get(get); + assertNResult(result, ROW, FAMILY, QUALIFIER, + new long [] {STAMPS[4], STAMPS[5]}, + new byte[][] {VALUES[4], VALUES[5]}, + 0, 1); + + scan = new Scan(ROW); + scan.addColumn(FAMILY, QUALIFIER); + scan.setMaxVersions(2); + result = getSingleScanResult(ht, scan); + assertNResult(result, ROW, FAMILY, QUALIFIER, + new long [] {STAMPS[4], STAMPS[5]}, + new byte[][] {VALUES[4], VALUES[5]}, + 0, 1); + + + // Add some memstore and retest + + // Insert 4 more versions of same column and a dupe + put = new Put(ROW); + put.add(FAMILY, QUALIFIER, STAMPS[3], VALUES[3]); + put.add(FAMILY, QUALIFIER, STAMPS[6], VALUES[6]); + put.add(FAMILY, QUALIFIER, STAMPS[7], VALUES[7]); + put.add(FAMILY, QUALIFIER, STAMPS[8], VALUES[8]); + ht.put(put); + + // Ensure maxVersions in query is respected + get = new Get(ROW); + get.addColumn(FAMILY, QUALIFIER); + get.setMaxVersions(); + result = ht.get(get); + assertNResult(result, ROW, FAMILY, QUALIFIER, + new long [] {STAMPS[1], STAMPS[2], STAMPS[3], STAMPS[4], STAMPS[5], STAMPS[6], STAMPS[7], STAMPS[8]}, + new byte[][] {VALUES[1], VALUES[2], VALUES[3], VALUES[4], VALUES[5], VALUES[6], VALUES[7], VALUES[8]}, + 0, 7); + + scan = new Scan(ROW); + scan.addColumn(FAMILY, QUALIFIER); + scan.setMaxVersions(); + result = getSingleScanResult(ht, scan); + assertNResult(result, ROW, FAMILY, QUALIFIER, + new long [] {STAMPS[1], STAMPS[2], STAMPS[3], STAMPS[4], STAMPS[5], STAMPS[6], STAMPS[7], STAMPS[8]}, + new byte[][] {VALUES[1], VALUES[2], VALUES[3], VALUES[4], VALUES[5], VALUES[6], VALUES[7], VALUES[8]}, + 0, 7); + + get = new Get(ROW); + get.setMaxVersions(); + result = ht.get(get); + assertNResult(result, ROW, FAMILY, QUALIFIER, + new long [] {STAMPS[1], STAMPS[2], STAMPS[3], STAMPS[4], STAMPS[5], STAMPS[6], STAMPS[7], STAMPS[8]}, + new byte[][] {VALUES[1], VALUES[2], VALUES[3], VALUES[4], VALUES[5], VALUES[6], VALUES[7], VALUES[8]}, + 0, 7); + + scan = new Scan(ROW); + scan.setMaxVersions(); + result = getSingleScanResult(ht, scan); + assertNResult(result, ROW, FAMILY, QUALIFIER, + new long [] {STAMPS[1], STAMPS[2], STAMPS[3], STAMPS[4], STAMPS[5], STAMPS[6], STAMPS[7], STAMPS[8]}, + new byte[][] {VALUES[1], VALUES[2], VALUES[3], VALUES[4], VALUES[5], VALUES[6], VALUES[7], VALUES[8]}, + 0, 7); + + // Verify we can get each one properly + getVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[1], VALUES[1]); + getVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[2], VALUES[2]); + getVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[4], VALUES[4]); + getVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[7], VALUES[7]); + scanVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[1], VALUES[1]); + scanVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[2], VALUES[2]); + scanVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[4], VALUES[4]); + scanVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[7], VALUES[7]); + + // Verify we don't accidentally get others + getVersionAndVerifyMissing(ht, ROW, FAMILY, QUALIFIER, STAMPS[0]); + getVersionAndVerifyMissing(ht, ROW, FAMILY, QUALIFIER, STAMPS[9]); + scanVersionAndVerifyMissing(ht, ROW, FAMILY, QUALIFIER, STAMPS[0]); + scanVersionAndVerifyMissing(ht, ROW, FAMILY, QUALIFIER, STAMPS[9]); + + // Ensure maxVersions of table is respected + + flushMemStore(TABLE); + + // Insert 4 more versions of same column and a dupe + put = new Put(ROW); + put.add(FAMILY, QUALIFIER, STAMPS[9], VALUES[9]); + put.add(FAMILY, QUALIFIER, STAMPS[11], VALUES[11]); + put.add(FAMILY, QUALIFIER, STAMPS[13], VALUES[13]); + put.add(FAMILY, QUALIFIER, STAMPS[15], VALUES[15]); + ht.put(put); + + get = new Get(ROW); + get.addColumn(FAMILY, QUALIFIER); + get.setMaxVersions(Integer.MAX_VALUE); + result = ht.get(get); + assertNResult(result, ROW, FAMILY, QUALIFIER, + new long [] {STAMPS[3], STAMPS[4], STAMPS[5], STAMPS[6], STAMPS[7], STAMPS[8], STAMPS[9], STAMPS[11], STAMPS[13], STAMPS[15]}, + new byte[][] {VALUES[3], VALUES[4], VALUES[5], VALUES[6], VALUES[7], VALUES[8], VALUES[9], VALUES[11], VALUES[13], VALUES[15]}, + 0, 9); + + scan = new Scan(ROW); + scan.addColumn(FAMILY, QUALIFIER); + scan.setMaxVersions(Integer.MAX_VALUE); + result = getSingleScanResult(ht, scan); + assertNResult(result, ROW, FAMILY, QUALIFIER, + new long [] {STAMPS[3], STAMPS[4], STAMPS[5], STAMPS[6], STAMPS[7], STAMPS[8], STAMPS[9], STAMPS[11], STAMPS[13], STAMPS[15]}, + new byte[][] {VALUES[3], VALUES[4], VALUES[5], VALUES[6], VALUES[7], VALUES[8], VALUES[9], VALUES[11], VALUES[13], VALUES[15]}, + 0, 9); + + // Delete a version in the memstore and a version in a storefile + Delete delete = new Delete(ROW); + delete.deleteColumn(FAMILY, QUALIFIER, STAMPS[11]); + delete.deleteColumn(FAMILY, QUALIFIER, STAMPS[7]); + ht.delete(delete); + + // Test that it's gone + get = new Get(ROW); + get.addColumn(FAMILY, QUALIFIER); + get.setMaxVersions(Integer.MAX_VALUE); + result = ht.get(get); + assertNResult(result, ROW, FAMILY, QUALIFIER, + new long [] {STAMPS[1], STAMPS[2], STAMPS[3], STAMPS[4], STAMPS[5], STAMPS[6], STAMPS[8], STAMPS[9], STAMPS[13], STAMPS[15]}, + new byte[][] {VALUES[1], VALUES[2], VALUES[3], VALUES[4], VALUES[5], VALUES[6], VALUES[8], VALUES[9], VALUES[13], VALUES[15]}, + 0, 9); + + scan = new Scan(ROW); + scan.addColumn(FAMILY, QUALIFIER); + scan.setMaxVersions(Integer.MAX_VALUE); + result = getSingleScanResult(ht, scan); + assertNResult(result, ROW, FAMILY, QUALIFIER, + new long [] {STAMPS[1], STAMPS[2], STAMPS[3], STAMPS[4], STAMPS[5], STAMPS[6], STAMPS[8], STAMPS[9], STAMPS[13], STAMPS[15]}, + new byte[][] {VALUES[1], VALUES[2], VALUES[3], VALUES[4], VALUES[5], VALUES[6], VALUES[8], VALUES[9], VALUES[13], VALUES[15]}, + 0, 9); + + } + + public void testVersionLimits() throws Exception { + byte [] TABLE = Bytes.toBytes("testVersionLimits"); + byte [][] FAMILIES = makeNAscii(FAMILY, 3); + int [] LIMITS = {1,3,5}; + long [] STAMPS = makeStamps(10); + byte [][] VALUES = makeNAscii(VALUE, 10); + HTable ht = createTable(TABLE, FAMILIES, LIMITS); + + // Insert limit + 1 on each family + Put put = new Put(ROW); + put.add(FAMILIES[0], QUALIFIER, STAMPS[0], VALUES[0]); + put.add(FAMILIES[0], QUALIFIER, STAMPS[1], VALUES[1]); + put.add(FAMILIES[1], QUALIFIER, STAMPS[0], VALUES[0]); + put.add(FAMILIES[1], QUALIFIER, STAMPS[1], VALUES[1]); + put.add(FAMILIES[1], QUALIFIER, STAMPS[2], VALUES[2]); + put.add(FAMILIES[1], QUALIFIER, STAMPS[3], VALUES[3]); + put.add(FAMILIES[2], QUALIFIER, STAMPS[0], VALUES[0]); + put.add(FAMILIES[2], QUALIFIER, STAMPS[1], VALUES[1]); + put.add(FAMILIES[2], QUALIFIER, STAMPS[2], VALUES[2]); + put.add(FAMILIES[2], QUALIFIER, STAMPS[3], VALUES[3]); + put.add(FAMILIES[2], QUALIFIER, STAMPS[4], VALUES[4]); + put.add(FAMILIES[2], QUALIFIER, STAMPS[5], VALUES[5]); + put.add(FAMILIES[2], QUALIFIER, STAMPS[6], VALUES[6]); + ht.put(put); + + // Verify we only get the right number out of each + + // Family0 + + Get get = new Get(ROW); + get.addColumn(FAMILIES[0], QUALIFIER); + get.setMaxVersions(Integer.MAX_VALUE); + Result result = ht.get(get); + assertNResult(result, ROW, FAMILIES[0], QUALIFIER, + new long [] {STAMPS[1]}, + new byte[][] {VALUES[1]}, + 0, 0); + + get = new Get(ROW); + get.addFamily(FAMILIES[0]); + get.setMaxVersions(Integer.MAX_VALUE); + result = ht.get(get); + assertNResult(result, ROW, FAMILIES[0], QUALIFIER, + new long [] {STAMPS[1]}, + new byte[][] {VALUES[1]}, + 0, 0); + + Scan scan = new Scan(ROW); + scan.addColumn(FAMILIES[0], QUALIFIER); + scan.setMaxVersions(Integer.MAX_VALUE); + result = getSingleScanResult(ht, scan); + assertNResult(result, ROW, FAMILIES[0], QUALIFIER, + new long [] {STAMPS[1]}, + new byte[][] {VALUES[1]}, + 0, 0); + + scan = new Scan(ROW); + scan.addFamily(FAMILIES[0]); + scan.setMaxVersions(Integer.MAX_VALUE); + result = getSingleScanResult(ht, scan); + assertNResult(result, ROW, FAMILIES[0], QUALIFIER, + new long [] {STAMPS[1]}, + new byte[][] {VALUES[1]}, + 0, 0); + + // Family1 + + get = new Get(ROW); + get.addColumn(FAMILIES[1], QUALIFIER); + get.setMaxVersions(Integer.MAX_VALUE); + result = ht.get(get); + assertNResult(result, ROW, FAMILIES[1], QUALIFIER, + new long [] {STAMPS[1], STAMPS[2], STAMPS[3]}, + new byte[][] {VALUES[1], VALUES[2], VALUES[3]}, + 0, 2); + + get = new Get(ROW); + get.addFamily(FAMILIES[1]); + get.setMaxVersions(Integer.MAX_VALUE); + result = ht.get(get); + assertNResult(result, ROW, FAMILIES[1], QUALIFIER, + new long [] {STAMPS[1], STAMPS[2], STAMPS[3]}, + new byte[][] {VALUES[1], VALUES[2], VALUES[3]}, + 0, 2); + + scan = new Scan(ROW); + scan.addColumn(FAMILIES[1], QUALIFIER); + scan.setMaxVersions(Integer.MAX_VALUE); + result = getSingleScanResult(ht, scan); + assertNResult(result, ROW, FAMILIES[1], QUALIFIER, + new long [] {STAMPS[1], STAMPS[2], STAMPS[3]}, + new byte[][] {VALUES[1], VALUES[2], VALUES[3]}, + 0, 2); + + scan = new Scan(ROW); + scan.addFamily(FAMILIES[1]); + scan.setMaxVersions(Integer.MAX_VALUE); + result = getSingleScanResult(ht, scan); + assertNResult(result, ROW, FAMILIES[1], QUALIFIER, + new long [] {STAMPS[1], STAMPS[2], STAMPS[3]}, + new byte[][] {VALUES[1], VALUES[2], VALUES[3]}, + 0, 2); + + // Family2 + + get = new Get(ROW); + get.addColumn(FAMILIES[2], QUALIFIER); + get.setMaxVersions(Integer.MAX_VALUE); + result = ht.get(get); + assertNResult(result, ROW, FAMILIES[2], QUALIFIER, + new long [] {STAMPS[2], STAMPS[3], STAMPS[4], STAMPS[5], STAMPS[6]}, + new byte[][] {VALUES[2], VALUES[3], VALUES[4], VALUES[5], VALUES[6]}, + 0, 4); + + get = new Get(ROW); + get.addFamily(FAMILIES[2]); + get.setMaxVersions(Integer.MAX_VALUE); + result = ht.get(get); + assertNResult(result, ROW, FAMILIES[2], QUALIFIER, + new long [] {STAMPS[2], STAMPS[3], STAMPS[4], STAMPS[5], STAMPS[6]}, + new byte[][] {VALUES[2], VALUES[3], VALUES[4], VALUES[5], VALUES[6]}, + 0, 4); + + scan = new Scan(ROW); + scan.addColumn(FAMILIES[2], QUALIFIER); + scan.setMaxVersions(Integer.MAX_VALUE); + result = getSingleScanResult(ht, scan); + assertNResult(result, ROW, FAMILIES[2], QUALIFIER, + new long [] {STAMPS[2], STAMPS[3], STAMPS[4], STAMPS[5], STAMPS[6]}, + new byte[][] {VALUES[2], VALUES[3], VALUES[4], VALUES[5], VALUES[6]}, + 0, 4); + + scan = new Scan(ROW); + scan.addFamily(FAMILIES[2]); + scan.setMaxVersions(Integer.MAX_VALUE); + result = getSingleScanResult(ht, scan); + assertNResult(result, ROW, FAMILIES[2], QUALIFIER, + new long [] {STAMPS[2], STAMPS[3], STAMPS[4], STAMPS[5], STAMPS[6]}, + new byte[][] {VALUES[2], VALUES[3], VALUES[4], VALUES[5], VALUES[6]}, + 0, 4); + + // Try all families + + get = new Get(ROW); + get.setMaxVersions(Integer.MAX_VALUE); + result = ht.get(get); + assertTrue("Expected 9 keys but received " + result.size(), + result.size() == 9); + + get = new Get(ROW); + get.addFamily(FAMILIES[0]); + get.addFamily(FAMILIES[1]); + get.addFamily(FAMILIES[2]); + get.setMaxVersions(Integer.MAX_VALUE); + result = ht.get(get); + assertTrue("Expected 9 keys but received " + result.size(), + result.size() == 9); + + get = new Get(ROW); + get.addColumn(FAMILIES[0], QUALIFIER); + get.addColumn(FAMILIES[1], QUALIFIER); + get.addColumn(FAMILIES[2], QUALIFIER); + get.setMaxVersions(Integer.MAX_VALUE); + result = ht.get(get); + assertTrue("Expected 9 keys but received " + result.size(), + result.size() == 9); + + scan = new Scan(ROW); + scan.setMaxVersions(Integer.MAX_VALUE); + result = getSingleScanResult(ht, scan); + assertTrue("Expected 9 keys but received " + result.size(), + result.size() == 9); + + scan = new Scan(ROW); + scan.setMaxVersions(Integer.MAX_VALUE); + scan.addFamily(FAMILIES[0]); + scan.addFamily(FAMILIES[1]); + scan.addFamily(FAMILIES[2]); + result = getSingleScanResult(ht, scan); + assertTrue("Expected 9 keys but received " + result.size(), + result.size() == 9); + + scan = new Scan(ROW); + scan.setMaxVersions(Integer.MAX_VALUE); + scan.addColumn(FAMILIES[0], QUALIFIER); + scan.addColumn(FAMILIES[1], QUALIFIER); + scan.addColumn(FAMILIES[2], QUALIFIER); + result = getSingleScanResult(ht, scan); + assertTrue("Expected 9 keys but received " + result.size(), + result.size() == 9); + + } + + public void testDeletes() throws Exception { + + byte [] TABLE = Bytes.toBytes("testDeletes"); + + byte [][] ROWS = makeNAscii(ROW, 6); + byte [][] FAMILIES = makeN(FAMILY, 3); + byte [][] VALUES = makeN(VALUE, 5); + long [] ts = {1000, 2000, 3000, 4000, 5000}; + + HTable ht = createTable(TABLE, FAMILIES); + + Put put = new Put(ROW); + put.add(FAMILIES[0], QUALIFIER, ts[0], VALUES[0]); + put.add(FAMILIES[0], QUALIFIER, ts[1], VALUES[1]); + ht.put(put); + + Delete delete = new Delete(ROW); + delete.deleteFamily(FAMILIES[0], ts[0]); + ht.delete(delete); + + Get get = new Get(ROW); + get.addFamily(FAMILIES[0]); + get.setMaxVersions(Integer.MAX_VALUE); + Result result = ht.get(get); + assertNResult(result, ROW, FAMILIES[0], QUALIFIER, + new long [] {ts[1]}, + new byte[][] {VALUES[1]}, + 0, 0); + + Scan scan = new Scan(ROW); + scan.addFamily(FAMILIES[0]); + scan.setMaxVersions(Integer.MAX_VALUE); + result = getSingleScanResult(ht, scan); + assertNResult(result, ROW, FAMILIES[0], QUALIFIER, + new long [] {ts[1]}, + new byte[][] {VALUES[1]}, + 0, 0); + + // Test delete latest version + put = new Put(ROW); + put.add(FAMILIES[0], QUALIFIER, ts[4], VALUES[4]); + put.add(FAMILIES[0], QUALIFIER, ts[2], VALUES[2]); + put.add(FAMILIES[0], QUALIFIER, ts[3], VALUES[3]); + ht.put(put); + + delete = new Delete(ROW); + delete.deleteColumn(FAMILIES[0], QUALIFIER); + ht.delete(delete); + + get = new Get(ROW); + get.addFamily(FAMILIES[0]); + get.setMaxVersions(Integer.MAX_VALUE); + result = ht.get(get); + assertNResult(result, ROW, FAMILIES[0], QUALIFIER, + new long [] {ts[1], ts[2], ts[3]}, + new byte[][] {VALUES[1], VALUES[2], VALUES[3]}, + 0, 2); + + scan = new Scan(ROW); + scan.addFamily(FAMILIES[0]); + scan.setMaxVersions(Integer.MAX_VALUE); + result = getSingleScanResult(ht, scan); + assertNResult(result, ROW, FAMILIES[0], QUALIFIER, + new long [] {ts[1], ts[2], ts[3]}, + new byte[][] {VALUES[1], VALUES[2], VALUES[3]}, + 0, 2); + + // Expected client behavior might be that you can re-put deleted values + // But alas, this is not to be. We can't put them back in either case. + + put = new Put(ROW); + put.add(FAMILIES[0], QUALIFIER, ts[0], VALUES[0]); + put.add(FAMILIES[0], QUALIFIER, ts[4], VALUES[4]); + ht.put(put); + + // The Get returns the latest value but then does not return the + // oldest, which was never deleted, ts[1]. + + get = new Get(ROW); + get.addFamily(FAMILIES[0]); + get.setMaxVersions(Integer.MAX_VALUE); + result = ht.get(get); + assertNResult(result, ROW, FAMILIES[0], QUALIFIER, + new long [] {ts[2], ts[3], ts[4]}, + new byte[][] {VALUES[2], VALUES[3], VALUES[4]}, + 0, 2); + + // The Scanner returns the previous values, the expected-unexpected behavior + + scan = new Scan(ROW); + scan.addFamily(FAMILIES[0]); + scan.setMaxVersions(Integer.MAX_VALUE); + result = getSingleScanResult(ht, scan); + assertNResult(result, ROW, FAMILIES[0], QUALIFIER, + new long [] {ts[1], ts[2], ts[3]}, + new byte[][] {VALUES[1], VALUES[2], VALUES[3]}, + 0, 2); + + // Test deleting an entire family from one row but not the other various ways + + put = new Put(ROWS[0]); + put.add(FAMILIES[1], QUALIFIER, ts[0], VALUES[0]); + put.add(FAMILIES[1], QUALIFIER, ts[1], VALUES[1]); + put.add(FAMILIES[2], QUALIFIER, ts[2], VALUES[2]); + put.add(FAMILIES[2], QUALIFIER, ts[3], VALUES[3]); + ht.put(put); + + put = new Put(ROWS[1]); + put.add(FAMILIES[1], QUALIFIER, ts[0], VALUES[0]); + put.add(FAMILIES[1], QUALIFIER, ts[1], VALUES[1]); + put.add(FAMILIES[2], QUALIFIER, ts[2], VALUES[2]); + put.add(FAMILIES[2], QUALIFIER, ts[3], VALUES[3]); + ht.put(put); + + put = new Put(ROWS[2]); + put.add(FAMILIES[1], QUALIFIER, ts[0], VALUES[0]); + put.add(FAMILIES[1], QUALIFIER, ts[1], VALUES[1]); + put.add(FAMILIES[2], QUALIFIER, ts[2], VALUES[2]); + put.add(FAMILIES[2], QUALIFIER, ts[3], VALUES[3]); + ht.put(put); + + delete = new Delete(ROWS[0]); + delete.deleteFamily(FAMILIES[2]); + ht.delete(delete); + + delete = new Delete(ROWS[1]); + delete.deleteColumns(FAMILIES[1], QUALIFIER); + ht.delete(delete); + + delete = new Delete(ROWS[2]); + delete.deleteColumn(FAMILIES[1], QUALIFIER); + delete.deleteColumn(FAMILIES[1], QUALIFIER); + delete.deleteColumn(FAMILIES[2], QUALIFIER); + ht.delete(delete); + + get = new Get(ROWS[0]); + get.addFamily(FAMILIES[1]); + get.addFamily(FAMILIES[2]); + get.setMaxVersions(Integer.MAX_VALUE); + result = ht.get(get); + assertTrue("Expected 2 keys but received " + result.size(), + result.size() == 2); + assertNResult(result, ROWS[0], FAMILIES[1], QUALIFIER, + new long [] {ts[0], ts[1]}, + new byte[][] {VALUES[0], VALUES[1]}, + 0, 1); + + scan = new Scan(ROWS[0]); + scan.addFamily(FAMILIES[1]); + scan.addFamily(FAMILIES[2]); + scan.setMaxVersions(Integer.MAX_VALUE); + result = getSingleScanResult(ht, scan); + assertTrue("Expected 2 keys but received " + result.size(), + result.size() == 2); + assertNResult(result, ROWS[0], FAMILIES[1], QUALIFIER, + new long [] {ts[0], ts[1]}, + new byte[][] {VALUES[0], VALUES[1]}, + 0, 1); + + get = new Get(ROWS[1]); + get.addFamily(FAMILIES[1]); + get.addFamily(FAMILIES[2]); + get.setMaxVersions(Integer.MAX_VALUE); + result = ht.get(get); + assertTrue("Expected 2 keys but received " + result.size(), + result.size() == 2); + + scan = new Scan(ROWS[1]); + scan.addFamily(FAMILIES[1]); + scan.addFamily(FAMILIES[2]); + scan.setMaxVersions(Integer.MAX_VALUE); + result = getSingleScanResult(ht, scan); + assertTrue("Expected 2 keys but received " + result.size(), + result.size() == 2); + + get = new Get(ROWS[2]); + get.addFamily(FAMILIES[1]); + get.addFamily(FAMILIES[2]); + get.setMaxVersions(Integer.MAX_VALUE); + result = ht.get(get); + assertTrue("Expected 1 key but received " + result.size(), + result.size() == 1); + assertNResult(result, ROWS[2], FAMILIES[2], QUALIFIER, + new long [] {ts[2]}, + new byte[][] {VALUES[2]}, + 0, 0); + + scan = new Scan(ROWS[2]); + scan.addFamily(FAMILIES[1]); + scan.addFamily(FAMILIES[2]); + scan.setMaxVersions(Integer.MAX_VALUE); + result = getSingleScanResult(ht, scan); + assertTrue("Expected 1 key but received " + result.size(), + result.size() == 1); + assertNResult(result, ROWS[2], FAMILIES[2], QUALIFIER, + new long [] {ts[2]}, + new byte[][] {VALUES[2]}, + 0, 0); + + // Test if we delete the family first in one row (HBASE-1541) + + delete = new Delete(ROWS[3]); + delete.deleteFamily(FAMILIES[1]); + ht.delete(delete); + + put = new Put(ROWS[3]); + put.add(FAMILIES[2], QUALIFIER, VALUES[0]); + ht.put(put); + + put = new Put(ROWS[4]); + put.add(FAMILIES[1], QUALIFIER, VALUES[1]); + put.add(FAMILIES[2], QUALIFIER, VALUES[2]); + ht.put(put); + + get = new Get(ROWS[3]); + get.addFamily(FAMILIES[1]); + get.addFamily(FAMILIES[2]); + get.setMaxVersions(Integer.MAX_VALUE); + result = ht.get(get); + assertTrue("Expected 1 key but received " + result.size(), + result.size() == 1); + + get = new Get(ROWS[4]); + get.addFamily(FAMILIES[1]); + get.addFamily(FAMILIES[2]); + get.setMaxVersions(Integer.MAX_VALUE); + result = ht.get(get); + assertTrue("Expected 2 keys but received " + result.size(), + result.size() == 2); + + scan = new Scan(ROWS[3]); + scan.addFamily(FAMILIES[1]); + scan.addFamily(FAMILIES[2]); + scan.setMaxVersions(Integer.MAX_VALUE); + ResultScanner scanner = ht.getScanner(scan); + result = scanner.next(); + assertTrue("Expected 1 key but received " + result.size(), + result.size() == 1); + assertTrue(Bytes.equals(result.sorted()[0].getRow(), ROWS[3])); + assertTrue(Bytes.equals(result.sorted()[0].getValue(), VALUES[0])); + result = scanner.next(); + assertTrue("Expected 2 keys but received " + result.size(), + result.size() == 2); + assertTrue(Bytes.equals(result.sorted()[0].getRow(), ROWS[4])); + assertTrue(Bytes.equals(result.sorted()[1].getRow(), ROWS[4])); + assertTrue(Bytes.equals(result.sorted()[0].getValue(), VALUES[1])); + assertTrue(Bytes.equals(result.sorted()[1].getValue(), VALUES[2])); + scanner.close(); + } + + public void testParallelScanners() throws Exception { + byte[] tableName = Bytes.toBytes("testParallelScanners"); + byte[] columnName = Bytes.toBytes("a:"); + // create the test table + HTableDescriptor htd = new HTableDescriptor(tableName); + htd.addFamily(new HColumnDescriptor(columnName)); + HBaseAdmin admin = new HBaseAdmin(conf); + admin.createTable(htd); + ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newFixedThreadPool(2); + + final ParallelHTable table = new ParallelHTable(conf, tableName, executorService); + + int rowCount = populateTable(table, columnName); + + // get the initial layout (should just be one region) + Map m = table.getRegionsInfo(); + System.out.println("Initial regions (" + m.size() + "): " + m); + assertTrue(m.size() == 1); + + splitTable(table, admin); + + // Verify row count + Scan scan = new Scan(); + int rows = countRows(table, scan, false); + assertEquals(rowCount, rows); + + // scan and verify that we get the correct number of results using the parallel scanners + scan = new Scan(); + rows = countRows(table, scan, true); + assertEquals(rowCount, rows); + } + + public void testParallelScannerWithStartRow() throws Exception { + byte[] tableName = Bytes.toBytes("testParallelScannerWithStartRow"); + byte[] columnName = Bytes.toBytes("a:"); + // create the test table + HTableDescriptor htd = new HTableDescriptor(tableName); + htd.addFamily(new HColumnDescriptor(columnName)); + HBaseAdmin admin = new HBaseAdmin(conf); + admin.createTable(htd); + ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newFixedThreadPool(2); + + final ParallelHTable table = new ParallelHTable(conf, tableName, executorService); + + int rowCount = populateTable(table, columnName); + + // get the initial layout (should just be one region) + Map m = table.getRegionsInfo(); + System.out.println("Initial regions (" + m.size() + "): " + m); + assertTrue(m.size() == 1); + splitTable(table, admin); + + // Verify total row count and find a start key + Scan scan = new Scan(); + ResultScanner scanner = table.getScanner(scan, false); + int rows = 0; + byte[] startRow = null; + int startRowOffset = 1000; + for(Result result : scanner) { + rows++; + if (rows == startRowOffset) { + startRow = result.getRow(); + } + } + scanner.close(); + assertEquals(rowCount, rows); + + // get the number of rows returned with using the non-parallel scanner + Scan scanWithStartRow = new Scan(); + scanWithStartRow.setStartRow(startRow); + int rowsWithStartRow = countRows(table, scanWithStartRow, false); + assertEquals(rowCount - (startRowOffset - 1), rowsWithStartRow); + + // scan and verify that we get the correct number of results using the parallel scanners + Scan scanWithStartRowParallel = new Scan(); + scanWithStartRowParallel.setStartRow(startRow); + int rowsWithStartRowParallel = countRows(table, scanWithStartRowParallel, true); + assertEquals(rowsWithStartRow, rowsWithStartRowParallel); + } + + public void testParallelScannerWithStopRow() throws Exception { + byte[] tableName = Bytes.toBytes("testParallelScannerWithStopRow"); + byte[] columnName = Bytes.toBytes("a:"); + // create the test table + HTableDescriptor htd = new HTableDescriptor(tableName); + htd.addFamily(new HColumnDescriptor(columnName)); + HBaseAdmin admin = new HBaseAdmin(conf); + admin.createTable(htd); + ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newFixedThreadPool(2); + + final ParallelHTable table = new ParallelHTable(conf, tableName, executorService); + + int rowCount = populateTable(table, columnName); + + // get the initial layout (should just be one region) + Map m = table.getRegionsInfo(); + System.out.println("Initial regions (" + m.size() + "): " + m); + assertTrue(m.size() == 1); + splitTable(table, admin); + + // Verify total row count and find a start key + int stopRowOffset = 10000; + Scan scan = new Scan(); + ResultScanner scanner = table.getScanner(scan, false); + int rows = 0; + byte[] stopRow = null; + for(Result result : scanner) { + rows++; + if (rows == stopRowOffset) { + stopRow = result.getRow(); + } + } + scanner.close(); + assertEquals(rowCount, rows); + + // get the number of rows returned with using the non-parallel scanner + Scan scanWithStartRow = new Scan(); + scanWithStartRow.setStopRow(stopRow); + int rowsWithStopRow = countRows(table, scanWithStartRow, false); + assertEquals(rowCount - (rowCount - (stopRowOffset - 1)), rowsWithStopRow); + + // scan and verify that we get the correct number of results using the parallel scanners + Scan scanWithStopRowParallel = new Scan(); + scanWithStopRowParallel.setStopRow(stopRow); + int rowsWithStartRowParallel = countRows(table, scanWithStopRowParallel, true); + assertEquals(rowsWithStopRow, rowsWithStartRowParallel); + } + + public void testParallelScannerWithStartAndStopRow() throws Exception { + byte[] tableName = Bytes.toBytes("testParallelScannerWithStartAndStopRow"); + byte[] columnName = Bytes.toBytes("a:"); + // create the test table + HTableDescriptor htd = new HTableDescriptor(tableName); + htd.addFamily(new HColumnDescriptor(columnName)); + HBaseAdmin admin = new HBaseAdmin(conf); + admin.createTable(htd); + ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newFixedThreadPool(2); + + final ParallelHTable table = new ParallelHTable(conf, tableName, executorService); + + int rowCount = populateTable(table, columnName); + + // get the initial layout (should just be one region) + Map m = table.getRegionsInfo(); + System.out.println("Initial regions (" + m.size() + "): " + m); + assertTrue(m.size() == 1); + splitTable(table, admin); + + // Verify total row count and find a start key + int startRowOffset = 1000; + int stopRowOffset = 10000; + Scan scan = new Scan(); + ResultScanner scanner = table.getScanner(scan, false); + int rows = 0; + byte[] startRow = null; + byte[] stopRow = null; + for(Result result : scanner) { + rows++; + if (rows == startRowOffset) { + startRow = result.getRow(); + } + if (rows == stopRowOffset) { + stopRow = result.getRow(); + } + } + scanner.close(); + assertEquals(rowCount, rows); + + // get the number of rows returned with using the non-parallel scanner + Scan scanWithStartRow = new Scan(); + scanWithStartRow.setStartRow(startRow); + scanWithStartRow.setStopRow(stopRow); + int rowsWithStopRow = countRows(table, scanWithStartRow, false); + assertEquals(stopRowOffset - startRowOffset, rowsWithStopRow); + + // scan and verify that we get the correct number of results using the parallel scanners + Scan scanWithStopRowParallel = new Scan(); + scanWithStopRowParallel.setStartRow(startRow); + scanWithStopRowParallel.setStopRow(stopRow); + int rowsWithStartRowParallel = countRows(table, scanWithStopRowParallel, true); + assertEquals(rowsWithStopRow, rowsWithStartRowParallel); + } + + private int countRows(ParallelHTable table, Scan scan, boolean scanInParallel) throws IOException { + ResultScanner scanner = table.getScanner(scan, scanInParallel); + if (scanInParallel) Assert.assertEquals("The incorrect scanner was returned", ParallelClientScanner.class, scanner.getClass()); + int rows = 0; + for(Result result : scanner) { + rows++; + } + scanner.close(); + return rows; + } + + public void testParallelScannersWithSplit() throws Exception { + byte[] tableName = Bytes.toBytes("testParallelScannersWithSplit"); + byte[] columnName = Bytes.toBytes("a:"); + // create the test table + HTableDescriptor htd = new HTableDescriptor(tableName); + htd.addFamily(new HColumnDescriptor(columnName)); + HBaseAdmin admin = new HBaseAdmin(conf); + admin.createTable(htd); + final ParallelHTable table = new ParallelHTable(conf, tableName, Executors.newFixedThreadPool(2)); + + int rowCount = populateTable(table, columnName); + + // get the initial layout (should just be one region) + Map m = table.getRegionsInfo(); + System.out.println("Initial regions (" + m.size() + "): " + m); + assertTrue(m.size() == 1); + + // open the scanner before the split to ensure the appropriate exception is thrown + Scan scan = new Scan(); + ResultScanner scanner = table.getScanner(scan, true); + Assert.assertEquals("The ParallelClientScanner isn't being used", ParallelClientScanner.class, scanner.getClass()); + int rows = 0; // We counted one row above. + while(rows < 2000) { + scanner.next(); + rows++; + } + + splitTable(table, admin); + + // Verify row count + try { + while (scanner.next() != null) { + rows++; + } + assertEquals(rowCount, rows); + + Assert.fail("An exception should have been thrown!"); + } catch (Exception e) { + Assert.assertEquals("Wrong exception thrown!", NotServingRegionException.class, e.getClass()); + } + scanner.close(); + } + + private int populateTable(HTable table, byte[] columnName) throws IOException { + byte[] k = new byte[3]; + int rowCount = 0; + for (byte b1 = 'a'; b1 < 'z'; b1++) { + for (byte b2 = 'a'; b2 < 'z'; b2++) { + for (byte b3 = 'a'; b3 < 'z'; b3++) { + k[0] = b1; + k[1] = b2; + k[2] = b3; + Put put = new Put(k); + byte [][] famAndQf = KeyValue.parseColumn(columnName); + put.add(famAndQf[0], famAndQf[1], k); + table.put(put); + rowCount++; + } + } + } + return rowCount; + } + + private void splitTable(final HTable table, HBaseAdmin admin) throws IOException, InterruptedException { + byte[] tableName = table.getTableName(); + final AtomicInteger initialCount = new AtomicInteger(table.getRegionsInfo().size()); + Thread t = new Thread("CheckForSplit") { + public void run() { + for (int i = 0; i < 20; i++) { + try { + sleep(1000); + } catch (InterruptedException e) { + continue; + } + // check again table = new HTable(conf, tableName); + Map regions = null; + try { + regions = table.getRegionsInfo(); + if (regions.size() > initialCount.get()) { + break; + } + } catch (IOException e) { + e.printStackTrace(); + } + } + } + }; + t.start(); + // tell the master to split the table + admin.split(Bytes.toString(tableName)); + t.join(); + } + + /** + * Baseline "scalability" test. + * + * Tests one hundred families, one million columns, one million versions + */ + public void XtestMillions() throws Exception { + + // 100 families + + // millions of columns + + // millions of versions + + } + + public void XtestMultipleRegionsAndBatchPuts() throws Exception { + // Two family table + + // Insert lots of rows + + // Insert to the same row with batched puts + + // Insert to multiple rows with batched puts + + // Split the table + + // Get row from first region + + // Get row from second region + + // Scan all rows + + // Insert to multiple regions with batched puts + + // Get row from first region + + // Get row from second region + + // Scan all rows + + + } + + public void XtestMultipleRowMultipleFamily() throws Exception { + + } + + /** + * Explicitly test JIRAs related to HBASE-880 / Client API + */ + public void testJIRAs() throws Exception { + jiraTest867(); + jiraTest861(); + jiraTest33(); + jiraTest1014(); + jiraTest1182(); + jiraTest52(); + } + + // + // JIRA Testers + // + + /** + * HBASE-867 + * If millions of columns in a column family, hbase scanner won't come up + * + * Test will create numRows rows, each with numColsPerRow columns + * (1 version each), and attempt to scan them all. + * + * To test at scale, up numColsPerRow to the millions + * (have not gotten that to work running as junit though) + */ + private void jiraTest867() throws Exception { + + int numRows = 10; + int numColsPerRow = 2000; + + byte [] TABLE = Bytes.toBytes("jiraTest867"); + + byte [][] ROWS = makeN(ROW, numRows); + byte [][] QUALIFIERS = makeN(QUALIFIER, numColsPerRow); + + HTable ht = createTable(TABLE, FAMILY); + + // Insert rows + + for(int i=0;i some timestamp + */ + private void jiraTest1182() throws Exception { + + byte [] TABLE = Bytes.toBytes("jiraTest1182"); + byte [][] VALUES = makeNAscii(VALUE, 7); + long [] STAMPS = makeStamps(7); + + HTable ht = createTable(TABLE, FAMILY, 10); + + // Insert lots versions + + Put put = new Put(ROW); + put.add(FAMILY, QUALIFIER, STAMPS[0], VALUES[0]); + put.add(FAMILY, QUALIFIER, STAMPS[1], VALUES[1]); + put.add(FAMILY, QUALIFIER, STAMPS[2], VALUES[2]); + put.add(FAMILY, QUALIFIER, STAMPS[3], VALUES[3]); + put.add(FAMILY, QUALIFIER, STAMPS[4], VALUES[4]); + put.add(FAMILY, QUALIFIER, STAMPS[5], VALUES[5]); + ht.put(put); + + getVersionRangeAndVerifyGreaterThan(ht, ROW, FAMILY, QUALIFIER, STAMPS, VALUES, 0, 5); + getVersionRangeAndVerifyGreaterThan(ht, ROW, FAMILY, QUALIFIER, STAMPS, VALUES, 2, 5); + getVersionRangeAndVerifyGreaterThan(ht, ROW, FAMILY, QUALIFIER, STAMPS, VALUES, 4, 5); + + scanVersionRangeAndVerifyGreaterThan(ht, ROW, FAMILY, QUALIFIER, STAMPS, VALUES, 0, 5); + scanVersionRangeAndVerifyGreaterThan(ht, ROW, FAMILY, QUALIFIER, STAMPS, VALUES, 2, 5); + scanVersionRangeAndVerifyGreaterThan(ht, ROW, FAMILY, QUALIFIER, STAMPS, VALUES, 4, 5); + + // Try same from storefile + flushMemStore(TABLE); + + getVersionRangeAndVerifyGreaterThan(ht, ROW, FAMILY, QUALIFIER, STAMPS, VALUES, 0, 5); + getVersionRangeAndVerifyGreaterThan(ht, ROW, FAMILY, QUALIFIER, STAMPS, VALUES, 2, 5); + getVersionRangeAndVerifyGreaterThan(ht, ROW, FAMILY, QUALIFIER, STAMPS, VALUES, 4, 5); + + scanVersionRangeAndVerifyGreaterThan(ht, ROW, FAMILY, QUALIFIER, STAMPS, VALUES, 0, 5); + scanVersionRangeAndVerifyGreaterThan(ht, ROW, FAMILY, QUALIFIER, STAMPS, VALUES, 2, 5); + scanVersionRangeAndVerifyGreaterThan(ht, ROW, FAMILY, QUALIFIER, STAMPS, VALUES, 4, 5); + + + } + + /** + * HBASE-52 + * Add a means of scanning over all versions + */ + private void jiraTest52() throws Exception { + + byte [] TABLE = Bytes.toBytes("jiraTest52"); + byte [][] VALUES = makeNAscii(VALUE, 7); + long [] STAMPS = makeStamps(7); + + HTable ht = createTable(TABLE, FAMILY, 10); + + // Insert lots versions + + Put put = new Put(ROW); + put.add(FAMILY, QUALIFIER, STAMPS[0], VALUES[0]); + put.add(FAMILY, QUALIFIER, STAMPS[1], VALUES[1]); + put.add(FAMILY, QUALIFIER, STAMPS[2], VALUES[2]); + put.add(FAMILY, QUALIFIER, STAMPS[3], VALUES[3]); + put.add(FAMILY, QUALIFIER, STAMPS[4], VALUES[4]); + put.add(FAMILY, QUALIFIER, STAMPS[5], VALUES[5]); + ht.put(put); + + getAllVersionsAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS, VALUES, 0, 5); + + scanAllVersionsAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS, VALUES, 0, 5); + + // Try same from storefile + flushMemStore(TABLE); + + getAllVersionsAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS, VALUES, 0, 5); + + scanAllVersionsAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS, VALUES, 0, 5); + + + } + + // + // Bulk Testers + // + + private void getVersionRangeAndVerifyGreaterThan(HTable ht, byte [] row, + byte [] family, byte [] qualifier, long [] stamps, byte [][] values, + int start, int end) + throws IOException { + Get get = new Get(row); + get.addColumn(family, qualifier); + get.setMaxVersions(Integer.MAX_VALUE); + get.setTimeRange(stamps[start+1], Long.MAX_VALUE); + Result result = ht.get(get); + assertNResult(result, row, family, qualifier, stamps, values, start+1, end); + } + + private void getVersionRangeAndVerify(HTable ht, byte [] row, byte [] family, + byte [] qualifier, long [] stamps, byte [][] values, int start, int end) + throws IOException { + Get get = new Get(row); + get.addColumn(family, qualifier); + get.setMaxVersions(Integer.MAX_VALUE); + get.setTimeRange(stamps[start], stamps[end]+1); + Result result = ht.get(get); + assertNResult(result, row, family, qualifier, stamps, values, start, end); + } + + private void getAllVersionsAndVerify(HTable ht, byte [] row, byte [] family, + byte [] qualifier, long [] stamps, byte [][] values, int start, int end) + throws IOException { + Get get = new Get(row); + get.addColumn(family, qualifier); + get.setMaxVersions(Integer.MAX_VALUE); + Result result = ht.get(get); + assertNResult(result, row, family, qualifier, stamps, values, start, end); + } + + private void scanVersionRangeAndVerifyGreaterThan(HTable ht, byte [] row, + byte [] family, byte [] qualifier, long [] stamps, byte [][] values, + int start, int end) + throws IOException { + Scan scan = new Scan(row); + scan.addColumn(family, qualifier); + scan.setMaxVersions(Integer.MAX_VALUE); + scan.setTimeRange(stamps[start+1], Long.MAX_VALUE); + Result result = getSingleScanResult(ht, scan); + assertNResult(result, row, family, qualifier, stamps, values, start+1, end); + } + + private void scanVersionRangeAndVerify(HTable ht, byte [] row, byte [] family, + byte [] qualifier, long [] stamps, byte [][] values, int start, int end) + throws IOException { + Scan scan = new Scan(row); + scan.addColumn(family, qualifier); + scan.setMaxVersions(Integer.MAX_VALUE); + scan.setTimeRange(stamps[start], stamps[end]+1); + Result result = getSingleScanResult(ht, scan); + assertNResult(result, row, family, qualifier, stamps, values, start, end); + } + + private void scanAllVersionsAndVerify(HTable ht, byte [] row, byte [] family, + byte [] qualifier, long [] stamps, byte [][] values, int start, int end) + throws IOException { + Scan scan = new Scan(row); + scan.addColumn(family, qualifier); + scan.setMaxVersions(Integer.MAX_VALUE); + Result result = getSingleScanResult(ht, scan); + assertNResult(result, row, family, qualifier, stamps, values, start, end); + } + + private void getVersionAndVerify(HTable ht, byte [] row, byte [] family, + byte [] qualifier, long stamp, byte [] value) + throws Exception { + Get get = new Get(row); + get.addColumn(family, qualifier); + get.setTimeStamp(stamp); + get.setMaxVersions(Integer.MAX_VALUE); + Result result = ht.get(get); + assertSingleResult(result, row, family, qualifier, stamp, value); + } + + private void getVersionAndVerifyMissing(HTable ht, byte [] row, byte [] family, + byte [] qualifier, long stamp) + throws Exception { + Get get = new Get(row); + get.addColumn(family, qualifier); + get.setTimeStamp(stamp); + get.setMaxVersions(Integer.MAX_VALUE); + Result result = ht.get(get); + assertEmptyResult(result); + } + + private void scanVersionAndVerify(HTable ht, byte [] row, byte [] family, + byte [] qualifier, long stamp, byte [] value) + throws Exception { + Scan scan = new Scan(row); + scan.addColumn(family, qualifier); + scan.setTimeStamp(stamp); + scan.setMaxVersions(Integer.MAX_VALUE); + Result result = getSingleScanResult(ht, scan); + assertSingleResult(result, row, family, qualifier, stamp, value); + } + + private void scanVersionAndVerifyMissing(HTable ht, byte [] row, + byte [] family, byte [] qualifier, long stamp) + throws Exception { + Scan scan = new Scan(row); + scan.addColumn(family, qualifier); + scan.setTimeStamp(stamp); + scan.setMaxVersions(Integer.MAX_VALUE); + Result result = getSingleScanResult(ht, scan); + assertNullResult(result); + } + + private void getTestNull(HTable ht, byte [] row, byte [] family, + byte [] value) + throws Exception { + + Get get = new Get(row); + get.addColumn(family, null); + Result result = ht.get(get); + assertSingleResult(result, row, family, null, value); + + get = new Get(row); + get.addColumn(family, EMPTY); + result = ht.get(get); + assertSingleResult(result, row, family, EMPTY, value); + + get = new Get(row); + get.addFamily(family); + result = ht.get(get); + assertSingleResult(result, row, family, EMPTY, value); + + get = new Get(row); + result = ht.get(get); + assertSingleResult(result, row, family, EMPTY, value); + + } + + private void scanTestNull(HTable ht, byte [] row, byte [] family, + byte [] value) + throws Exception { + + Scan scan = new Scan(); + scan.addColumn(family, null); + Result result = getSingleScanResult(ht, scan); + assertSingleResult(result, row, family, EMPTY, value); + + scan = new Scan(); + scan.addColumn(family, EMPTY); + result = getSingleScanResult(ht, scan); + assertSingleResult(result, row, family, EMPTY, value); + + scan = new Scan(); + scan.addFamily(family); + result = getSingleScanResult(ht, scan); + assertSingleResult(result, row, family, EMPTY, value); + + scan = new Scan(); + result = getSingleScanResult(ht, scan); + assertSingleResult(result, row, family, EMPTY, value); + + } + + private void singleRowGetTest(HTable ht, byte [][] ROWS, byte [][] FAMILIES, + byte [][] QUALIFIERS, byte [][] VALUES) + throws Exception { + + // Single column from memstore + Get get = new Get(ROWS[0]); + get.addColumn(FAMILIES[4], QUALIFIERS[0]); + Result result = ht.get(get); + assertSingleResult(result, ROWS[0], FAMILIES[4], QUALIFIERS[0], VALUES[0]); + + // Single column from storefile + get = new Get(ROWS[0]); + get.addColumn(FAMILIES[2], QUALIFIERS[2]); + result = ht.get(get); + assertSingleResult(result, ROWS[0], FAMILIES[2], QUALIFIERS[2], VALUES[2]); + + // Single column from storefile, family match + get = new Get(ROWS[0]); + get.addFamily(FAMILIES[7]); + result = ht.get(get); + assertSingleResult(result, ROWS[0], FAMILIES[7], QUALIFIERS[7], VALUES[7]); + + // Two columns, one from memstore one from storefile, same family, + // wildcard match + get = new Get(ROWS[0]); + get.addFamily(FAMILIES[4]); + result = ht.get(get); + assertDoubleResult(result, ROWS[0], FAMILIES[4], QUALIFIERS[0], VALUES[0], + FAMILIES[4], QUALIFIERS[4], VALUES[4]); + + // Two columns, one from memstore one from storefile, same family, + // explicit match + get = new Get(ROWS[0]); + get.addColumn(FAMILIES[4], QUALIFIERS[0]); + get.addColumn(FAMILIES[4], QUALIFIERS[4]); + result = ht.get(get); + assertDoubleResult(result, ROWS[0], FAMILIES[4], QUALIFIERS[0], VALUES[0], + FAMILIES[4], QUALIFIERS[4], VALUES[4]); + + // Three column, one from memstore two from storefile, different families, + // wildcard match + get = new Get(ROWS[0]); + get.addFamily(FAMILIES[4]); + get.addFamily(FAMILIES[7]); + result = ht.get(get); + assertNResult(result, ROWS[0], FAMILIES, QUALIFIERS, VALUES, + new int [][] { {4, 0, 0}, {4, 4, 4}, {7, 7, 7} }); + + // Multiple columns from everywhere storefile, many family, wildcard + get = new Get(ROWS[0]); + get.addFamily(FAMILIES[2]); + get.addFamily(FAMILIES[4]); + get.addFamily(FAMILIES[6]); + get.addFamily(FAMILIES[7]); + result = ht.get(get); + assertNResult(result, ROWS[0], FAMILIES, QUALIFIERS, VALUES, + new int [][] { + {2, 2, 2}, {2, 4, 4}, {4, 0, 0}, {4, 4, 4}, {6, 6, 6}, {6, 7, 7}, {7, 7, 7} + }); + + // Multiple columns from everywhere storefile, many family, wildcard + get = new Get(ROWS[0]); + get.addColumn(FAMILIES[2], QUALIFIERS[2]); + get.addColumn(FAMILIES[2], QUALIFIERS[4]); + get.addColumn(FAMILIES[4], QUALIFIERS[0]); + get.addColumn(FAMILIES[4], QUALIFIERS[4]); + get.addColumn(FAMILIES[6], QUALIFIERS[6]); + get.addColumn(FAMILIES[6], QUALIFIERS[7]); + get.addColumn(FAMILIES[7], QUALIFIERS[7]); + get.addColumn(FAMILIES[7], QUALIFIERS[8]); + result = ht.get(get); + assertNResult(result, ROWS[0], FAMILIES, QUALIFIERS, VALUES, + new int [][] { + {2, 2, 2}, {2, 4, 4}, {4, 0, 0}, {4, 4, 4}, {6, 6, 6}, {6, 7, 7}, {7, 7, 7} + }); + + // Everything + get = new Get(ROWS[0]); + result = ht.get(get); + assertNResult(result, ROWS[0], FAMILIES, QUALIFIERS, VALUES, + new int [][] { + {2, 2, 2}, {2, 4, 4}, {4, 0, 0}, {4, 4, 4}, {6, 6, 6}, {6, 7, 7}, {7, 7, 7}, {9, 0, 0} + }); + + // Get around inserted columns + + get = new Get(ROWS[1]); + result = ht.get(get); + assertEmptyResult(result); + + get = new Get(ROWS[0]); + get.addColumn(FAMILIES[4], QUALIFIERS[3]); + get.addColumn(FAMILIES[2], QUALIFIERS[3]); + result = ht.get(get); + assertEmptyResult(result); + + } + + private void singleRowScanTest(HTable ht, byte [][] ROWS, byte [][] FAMILIES, + byte [][] QUALIFIERS, byte [][] VALUES) + throws Exception { + + // Single column from memstore + Scan scan = new Scan(); + scan.addColumn(FAMILIES[4], QUALIFIERS[0]); + Result result = getSingleScanResult(ht, scan); + assertSingleResult(result, ROWS[0], FAMILIES[4], QUALIFIERS[0], VALUES[0]); + + // Single column from storefile + scan = new Scan(); + scan.addColumn(FAMILIES[2], QUALIFIERS[2]); + result = getSingleScanResult(ht, scan); + assertSingleResult(result, ROWS[0], FAMILIES[2], QUALIFIERS[2], VALUES[2]); + + // Single column from storefile, family match + scan = new Scan(); + scan.addFamily(FAMILIES[7]); + result = getSingleScanResult(ht, scan); + assertSingleResult(result, ROWS[0], FAMILIES[7], QUALIFIERS[7], VALUES[7]); + + // Two columns, one from memstore one from storefile, same family, + // wildcard match + scan = new Scan(); + scan.addFamily(FAMILIES[4]); + result = getSingleScanResult(ht, scan); + assertDoubleResult(result, ROWS[0], FAMILIES[4], QUALIFIERS[0], VALUES[0], + FAMILIES[4], QUALIFIERS[4], VALUES[4]); + + // Two columns, one from memstore one from storefile, same family, + // explicit match + scan = new Scan(); + scan.addColumn(FAMILIES[4], QUALIFIERS[0]); + scan.addColumn(FAMILIES[4], QUALIFIERS[4]); + result = getSingleScanResult(ht, scan); + assertDoubleResult(result, ROWS[0], FAMILIES[4], QUALIFIERS[0], VALUES[0], + FAMILIES[4], QUALIFIERS[4], VALUES[4]); + + // Three column, one from memstore two from storefile, different families, + // wildcard match + scan = new Scan(); + scan.addFamily(FAMILIES[4]); + scan.addFamily(FAMILIES[7]); + result = getSingleScanResult(ht, scan); + assertNResult(result, ROWS[0], FAMILIES, QUALIFIERS, VALUES, + new int [][] { {4, 0, 0}, {4, 4, 4}, {7, 7, 7} }); + + // Multiple columns from everywhere storefile, many family, wildcard + scan = new Scan(); + scan.addFamily(FAMILIES[2]); + scan.addFamily(FAMILIES[4]); + scan.addFamily(FAMILIES[6]); + scan.addFamily(FAMILIES[7]); + result = getSingleScanResult(ht, scan); + assertNResult(result, ROWS[0], FAMILIES, QUALIFIERS, VALUES, + new int [][] { + {2, 2, 2}, {2, 4, 4}, {4, 0, 0}, {4, 4, 4}, {6, 6, 6}, {6, 7, 7}, {7, 7, 7} + }); + + // Multiple columns from everywhere storefile, many family, wildcard + scan = new Scan(); + scan.addColumn(FAMILIES[2], QUALIFIERS[2]); + scan.addColumn(FAMILIES[2], QUALIFIERS[4]); + scan.addColumn(FAMILIES[4], QUALIFIERS[0]); + scan.addColumn(FAMILIES[4], QUALIFIERS[4]); + scan.addColumn(FAMILIES[6], QUALIFIERS[6]); + scan.addColumn(FAMILIES[6], QUALIFIERS[7]); + scan.addColumn(FAMILIES[7], QUALIFIERS[7]); + scan.addColumn(FAMILIES[7], QUALIFIERS[8]); + result = getSingleScanResult(ht, scan); + assertNResult(result, ROWS[0], FAMILIES, QUALIFIERS, VALUES, + new int [][] { + {2, 2, 2}, {2, 4, 4}, {4, 0, 0}, {4, 4, 4}, {6, 6, 6}, {6, 7, 7}, {7, 7, 7} + }); + + // Everything + scan = new Scan(); + result = getSingleScanResult(ht, scan); + assertNResult(result, ROWS[0], FAMILIES, QUALIFIERS, VALUES, + new int [][] { + {2, 2, 2}, {2, 4, 4}, {4, 0, 0}, {4, 4, 4}, {6, 6, 6}, {6, 7, 7}, {7, 7, 7}, {9, 0, 0} + }); + + // Scan around inserted columns + + scan = new Scan(ROWS[1]); + result = getSingleScanResult(ht, scan); + assertNullResult(result); + + scan = new Scan(); + scan.addColumn(FAMILIES[4], QUALIFIERS[3]); + scan.addColumn(FAMILIES[2], QUALIFIERS[3]); + result = getSingleScanResult(ht, scan); + assertNullResult(result); + } + + + + /** + * Verify a single column using gets. + * Expects family and qualifier arrays to be valid for at least + * the range: idx-2 < idx < idx+2 + */ + private void getVerifySingleColumn(HTable ht, + byte [][] ROWS, int ROWIDX, + byte [][] FAMILIES, int FAMILYIDX, + byte [][] QUALIFIERS, int QUALIFIERIDX, + byte [][] VALUES, int VALUEIDX) + throws Exception { + + Get get = new Get(ROWS[ROWIDX]); + Result result = ht.get(get); + assertSingleResult(result, ROWS[ROWIDX], FAMILIES[FAMILYIDX], + QUALIFIERS[QUALIFIERIDX], VALUES[VALUEIDX]); + + get = new Get(ROWS[ROWIDX]); + get.addFamily(FAMILIES[FAMILYIDX]); + result = ht.get(get); + assertSingleResult(result, ROWS[ROWIDX], FAMILIES[FAMILYIDX], + QUALIFIERS[QUALIFIERIDX], VALUES[VALUEIDX]); + + get = new Get(ROWS[ROWIDX]); + get.addFamily(FAMILIES[FAMILYIDX-2]); + get.addFamily(FAMILIES[FAMILYIDX]); + get.addFamily(FAMILIES[FAMILYIDX+2]); + result = ht.get(get); + assertSingleResult(result, ROWS[ROWIDX], FAMILIES[FAMILYIDX], + QUALIFIERS[QUALIFIERIDX], VALUES[VALUEIDX]); + + get = new Get(ROWS[ROWIDX]); + get.addColumn(FAMILIES[FAMILYIDX], QUALIFIERS[0]); + result = ht.get(get); + assertSingleResult(result, ROWS[ROWIDX], FAMILIES[FAMILYIDX], + QUALIFIERS[QUALIFIERIDX], VALUES[VALUEIDX]); + + get = new Get(ROWS[ROWIDX]); + get.addColumn(FAMILIES[FAMILYIDX], QUALIFIERS[1]); + get.addFamily(FAMILIES[FAMILYIDX]); + result = ht.get(get); + assertSingleResult(result, ROWS[ROWIDX], FAMILIES[FAMILYIDX], + QUALIFIERS[QUALIFIERIDX], VALUES[VALUEIDX]); + + get = new Get(ROWS[ROWIDX]); + get.addFamily(FAMILIES[FAMILYIDX]); + get.addColumn(FAMILIES[FAMILYIDX+1], QUALIFIERS[1]); + get.addColumn(FAMILIES[FAMILYIDX-2], QUALIFIERS[1]); + get.addFamily(FAMILIES[FAMILYIDX-1]); + get.addFamily(FAMILIES[FAMILYIDX+2]); + result = ht.get(get); + assertSingleResult(result, ROWS[ROWIDX], FAMILIES[FAMILYIDX], + QUALIFIERS[QUALIFIERIDX], VALUES[VALUEIDX]); + + } + + + /** + * Verify a single column using scanners. + * Expects family and qualifier arrays to be valid for at least + * the range: idx-2 to idx+2 + * Expects row array to be valid for at least idx to idx+2 + */ + private void scanVerifySingleColumn(HTable ht, + byte [][] ROWS, int ROWIDX, + byte [][] FAMILIES, int FAMILYIDX, + byte [][] QUALIFIERS, int QUALIFIERIDX, + byte [][] VALUES, int VALUEIDX) + throws Exception { + + Scan scan = new Scan(); + Result result = getSingleScanResult(ht, scan); + assertSingleResult(result, ROWS[ROWIDX], FAMILIES[FAMILYIDX], + QUALIFIERS[QUALIFIERIDX], VALUES[VALUEIDX]); + + scan = new Scan(ROWS[ROWIDX]); + result = getSingleScanResult(ht, scan); + assertSingleResult(result, ROWS[ROWIDX], FAMILIES[FAMILYIDX], + QUALIFIERS[QUALIFIERIDX], VALUES[VALUEIDX]); + + scan = new Scan(ROWS[ROWIDX], ROWS[ROWIDX+1]); + result = getSingleScanResult(ht, scan); + assertSingleResult(result, ROWS[ROWIDX], FAMILIES[FAMILYIDX], + QUALIFIERS[QUALIFIERIDX], VALUES[VALUEIDX]); + + scan = new Scan(HConstants.EMPTY_START_ROW, ROWS[ROWIDX+1]); + result = getSingleScanResult(ht, scan); + assertSingleResult(result, ROWS[ROWIDX], FAMILIES[FAMILYIDX], + QUALIFIERS[QUALIFIERIDX], VALUES[VALUEIDX]); + + scan = new Scan(); + scan.addFamily(FAMILIES[FAMILYIDX]); + result = getSingleScanResult(ht, scan); + assertSingleResult(result, ROWS[ROWIDX], FAMILIES[FAMILYIDX], + QUALIFIERS[QUALIFIERIDX], VALUES[VALUEIDX]); + + scan = new Scan(); + scan.addColumn(FAMILIES[FAMILYIDX], QUALIFIERS[QUALIFIERIDX]); + result = getSingleScanResult(ht, scan); + assertSingleResult(result, ROWS[ROWIDX], FAMILIES[FAMILYIDX], + QUALIFIERS[QUALIFIERIDX], VALUES[VALUEIDX]); + + scan = new Scan(); + scan.addColumn(FAMILIES[FAMILYIDX], QUALIFIERS[QUALIFIERIDX+1]); + scan.addFamily(FAMILIES[FAMILYIDX]); + result = getSingleScanResult(ht, scan); + assertSingleResult(result, ROWS[ROWIDX], FAMILIES[FAMILYIDX], + QUALIFIERS[QUALIFIERIDX], VALUES[VALUEIDX]); + + scan = new Scan(); + scan.addColumn(FAMILIES[FAMILYIDX-1], QUALIFIERS[QUALIFIERIDX+1]); + scan.addColumn(FAMILIES[FAMILYIDX], QUALIFIERS[QUALIFIERIDX]); + scan.addFamily(FAMILIES[FAMILYIDX+1]); + result = getSingleScanResult(ht, scan); + assertSingleResult(result, ROWS[ROWIDX], FAMILIES[FAMILYIDX], + QUALIFIERS[QUALIFIERIDX], VALUES[VALUEIDX]); + + } + + /** + * Verify we do not read any values by accident around a single column + * Same requirements as getVerifySingleColumn + */ + private void getVerifySingleEmpty(HTable ht, + byte [][] ROWS, int ROWIDX, + byte [][] FAMILIES, int FAMILYIDX, + byte [][] QUALIFIERS, int QUALIFIERIDX) + throws Exception { + + Get get = new Get(ROWS[ROWIDX]); + get.addFamily(FAMILIES[4]); + get.addColumn(FAMILIES[4], QUALIFIERS[1]); + Result result = ht.get(get); + assertEmptyResult(result); + + get = new Get(ROWS[ROWIDX]); + get.addFamily(FAMILIES[4]); + get.addColumn(FAMILIES[4], QUALIFIERS[2]); + result = ht.get(get); + assertEmptyResult(result); + + get = new Get(ROWS[ROWIDX]); + get.addFamily(FAMILIES[3]); + get.addColumn(FAMILIES[4], QUALIFIERS[2]); + get.addFamily(FAMILIES[5]); + result = ht.get(get); + assertEmptyResult(result); + + get = new Get(ROWS[ROWIDX+1]); + result = ht.get(get); + assertEmptyResult(result); + + } + + private void scanVerifySingleEmpty(HTable ht, + byte [][] ROWS, int ROWIDX, + byte [][] FAMILIES, int FAMILYIDX, + byte [][] QUALIFIERS, int QUALIFIERIDX) + throws Exception { + + Scan scan = new Scan(ROWS[ROWIDX+1]); + Result result = getSingleScanResult(ht, scan); + assertNullResult(result); + + scan = new Scan(ROWS[ROWIDX+1],ROWS[ROWIDX+2]); + result = getSingleScanResult(ht, scan); + assertNullResult(result); + + scan = new Scan(HConstants.EMPTY_START_ROW, ROWS[ROWIDX]); + result = getSingleScanResult(ht, scan); + assertNullResult(result); + + scan = new Scan(); + scan.addColumn(FAMILIES[FAMILYIDX], QUALIFIERS[QUALIFIERIDX+1]); + scan.addFamily(FAMILIES[FAMILYIDX-1]); + result = getSingleScanResult(ht, scan); + assertNullResult(result); + + } + + // + // Verifiers + // + + private void assertKey(KeyValue key, byte [] row, byte [] family, + byte [] qualifier, byte [] value) + throws Exception { + assertTrue("Expected row [" + Bytes.toString(row) + "] " + + "Got row [" + Bytes.toString(key.getRow()) +"]", + equals(row, key.getRow())); + assertTrue("Expected family [" + Bytes.toString(family) + "] " + + "Got family [" + Bytes.toString(key.getFamily()) + "]", + equals(family, key.getFamily())); + assertTrue("Expected qualifier [" + Bytes.toString(qualifier) + "] " + + "Got qualifier [" + Bytes.toString(key.getQualifier()) + "]", + equals(qualifier, key.getQualifier())); + assertTrue("Expected value [" + Bytes.toString(value) + "] " + + "Got value [" + Bytes.toString(key.getValue()) + "]", + equals(value, key.getValue())); + } + + private void assertNumKeys(Result result, int n) throws Exception { + assertTrue("Expected " + n + " keys but got " + result.size(), + result.size() == n); + } + + + private void assertNResult(Result result, byte [] row, + byte [][] families, byte [][] qualifiers, byte [][] values, + int [][] idxs) + throws Exception { + assertTrue("Expected row [" + Bytes.toString(row) + "] " + + "Got row [" + Bytes.toString(result.getRow()) +"]", + equals(row, result.getRow())); + assertTrue("Expected " + idxs.length + " keys but result contains " + + result.size(), result.size() == idxs.length); + + KeyValue [] keys = result.sorted(); + + for(int i=0;i 256) { + return makeNBig(base, n); + } + byte [][] ret = new byte[n][]; + for(int i=0;i 256) { + return makeNBig(base, n); + } + byte [][] ret = new byte[n][]; + for(int i=0;i> 8); + ret[i] = Bytes.add(base, new byte[]{(byte)byteB,(byte)byteA}); + } + return ret; + } + + private long [] makeStamps(int n) { + long [] stamps = new long[n]; + for(int i=0;i completionService; + private int pendingTasks = 0; + private List callables; + private boolean isClosed = false; + + /** + * Constructor. + * @param table the name + * @param scan the scan + * @param preFetchCount the pre fetch count + * @throws IOException if an error occurs + */ + public ParallelScannerManager(ParallelHTable table, Scan scan, + int preFetchCount) + throws IOException { + this.table = table; + this.preFetchCount = preFetchCount; + callables = new ArrayList(); + ExecutorService threadPool = table.getExecutorService(); + completionService = + new ExecutorCompletionService(threadPool); + Set regions = table.getRegionsInfo().keySet(); + for (HRegionInfo region : regions) { + /* + The logic below determines if the region should be included in the scan. + It handles the case when the scan has specified a startRow and/or stopRow. + */ + boolean isScanInterestedInRegion = (scan.getStartRow().length == 0 + && scan.getStopRow().length == 0) || regions.size() == 1; + + if (!isScanInterestedInRegion) { + byte[] regionStartRow = region.getStartKey(); + byte[] regionEndRow = region.getEndKey(); + + isScanInterestedInRegion = isScanInterestedInRegion(scan, + regionStartRow, regionEndRow); + } + + if (isScanInterestedInRegion) { + submitTask(scan, region); + } + } + } + + protected static boolean isScanInterestedInRegion(Scan scan, + byte[] regionStartRow, + byte[] regionEndRow) { + byte[] scanStartRow = scan.getStartRow(); + byte[] scanStopRow = scan.getStopRow(); + + boolean scanStartRowLessOrEqualsRegionEndRow = isEmpty(scanStartRow) + || isEmpty(regionEndRow) + || isLesserEqual(scanStartRow, regionEndRow); + boolean scanStopRowGreaterOrEqualsRegionStartRow = isEmpty(scanStopRow) + || isEmpty(regionStartRow) + || isGreaterEqual(scanStopRow, regionStartRow); + + return scanStartRowLessOrEqualsRegionEndRow + && scanStopRowGreaterOrEqualsRegionStartRow; + } + + private static boolean isEmpty(byte[] bytes) { + return bytes == null || bytes.length == 0; + } + + private static boolean isLesserEqual(byte[] left, byte[] right) { + return Bytes.compareTo(left, right) <= 0; + } + + private static boolean isGreaterEqual(byte[] left, byte[] right) { + return Bytes.compareTo(left, right) >= 0; + } + + private void submitTask(Scan scan, HRegionInfo regionInfo) { + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Submitting fetch task for region %s", + regionInfo)); + } + RegionCallable regionCallable = new RegionCallable(table, scan, regionInfo, + preFetchCount); + callables.add(regionCallable); + reSubmitTask(regionCallable); + } + + /** + * Submits the callable as a task and increments the counter to keep track of + * the number of pending tasks. + * @param callable the callable task + */ + private void reSubmitTask(final RegionCallable callable) { + if (isClosed) { + return; + } + completionService.submit(new Callable() { + @Override + public RegionCallableResult call() throws Exception { + return table.getConnection().getRegionServerWithRetries(callable); + } + }); + pendingTasks++; + } + + /** + * Has the same semantics as {@link java.util.concurrent.CompletionService#take()} + * except that the counter of pending tasks is decremented when a task is + * taken from the queue. Also, this method will immediately return null if + * there are no more pending tasks queued. + * @return the callable result or null if no more tasks are pendingt + * @throws java.util.concurrent.ExecutionException + * if the execution of the task failed + */ + private RegionCallableResult takeTask() throws ExecutionException { + // if there are no more pending tasks then we have run out of things to + // do... + if (pendingTasks == 0) { + return null; + } + + try { + Future future = completionService.take(); + pendingTasks--; + return future.get(); + } catch (InterruptedException e) { + // we were intertupted, recurse and wait to take another + return takeTask(); + } + } + + /** + * Used to determine if all the managed scanner callables are exhausted. + */ + private boolean isCompletionServiceEmpty() { + return pendingTasks == 0; + } + + /** + * Returns the next array of results. + * @return the next array of results or null if there are no more results + * @throws IOException if an error occurs + */ + public Result[] next() throws IOException { + // wait for a result to come back + RegionCallableResult result = null; + try { + result = takeTask(); + } catch (ExecutionException e) { + if (e.getCause() instanceof RegionSplitException) { + throw ((RegionSplitException) e.getCause()).getCause(); + } else if (e.getCause() instanceof IOException) { + throw (IOException) e.getCause(); + } else { + throw new IllegalStateException(e); + } + } + + if (result == null) { + // all regions are out of records, that's it we're finished + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("All regions exhausted...")); + } + return null; + } else if (result.getResults() == null) { + // all regions are out of records, that's it we're finished + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Region exhausted, Region: %s, Count: %s", + result.getRegionCallable().getRegionInfo(), result + .getRegionCallable().getResultsConsumed())); + } + // that region is been exhausted, recurse to get another regions results + return next(); + } else { + // there are results, so take them and resubmit that region to get more + // results + Result[] results = result.getResults(); + // resubmit the task to get the next batch + reSubmitTask(result.getRegionCallable()); + return results; + } + } + + /** + * Clear any running tasks and close the region callables. + */ + public void close() { + isClosed = true; + // wait until all the workers have completed + while (!isCompletionServiceEmpty()) { + try { + takeTask(); + } catch (ExecutionException e) { + // do nothing + LOG.debug("Ignore", e); + } + } + + // close the individual region callables + for (RegionCallable callable : callables) { + try { + callable.close(); + } catch (Exception e) { + // just keep going + LOG.debug("Ignore", e); + } + } + } + + /** + * The region callable. + */ + protected static class RegionCallable extends + ServerCallable { + static final Logger LOG = Logger.getLogger(RegionCallable.class); + + private boolean instantiated = false; + private long scannerId = -1L; + private Scan scan; + private int preFetchCount; + + private byte[] lastConsumedKey; + private long resultsConsumed; + + public RegionCallable(HTable table, Scan scan, HRegionInfo regionInfo, int preFetchCount) { + super(table.getConnection(), table.getTableName(), regionInfo.getStartKey()); + this.scan = scan; + this.preFetchCount = preFetchCount; + } + + @Override + public void instantiateServer(boolean reload) throws IOException { + if (!instantiated || reload) { + super.instantiateServer(reload); + instantiated = true; + } + } + + public RegionCallableResult call() throws IOException { + if (server == null) { + throw new IllegalStateException("The server hasn't been instantiated " + + "yet. Ensure that you've called the " + "instantiateServer method " + + "or that you are using the connection.getRegionServerWithRetries " + + "method"); + } + + if (scannerId == -1) { + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Opening scanner on region: %s", + super.location.getRegionInfo())); + } + this.scannerId = server.openScanner(this.location.getRegionInfo() + .getRegionName(), scan); + } + + Result[] results = null; + try { + /* if (LOG.isDebugEnabled()) {LOG.debug(String.format( + * "Fetching results using scannerId: %s from region: %s", scannerId, + * regionInfo)); } + */ + results = server.next(scannerId, preFetchCount); + } catch (IOException e) { + IOException ioe = null; + if (e instanceof RemoteException) { + ioe = + RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + } + if (ioe != null && ioe instanceof NotServingRegionException) { + // Throw a DNRE so that we break out of cycle of calling NSRE + // when what we need is to open scanner against new location. + // Attach NSRE to signal client that it needs to resetup scanner. + throw new RegionSplitException(this, (NotServingRegionException) ioe); + } + } + + boolean resultEmpty = isResultEmpty(results); + if (!resultEmpty) { + this.lastConsumedKey = results[results.length - 1].getRow(); + this.resultsConsumed += results.length; + } + return new RegionCallableResult(this, resultEmpty ? null : results); + } + + private boolean isResultEmpty(Result[] results) { + return results == null || results.length == 0; + } + + /** + * Close the callable which in turn closes the scanner. + */ + public void close() { + if (this.scannerId == -1L) { + return; + } + try { + this.server.close(this.scannerId); + } catch (IOException e) { + LOG.warn("Ignore, probably already closed", e); + } + this.scannerId = -1L; + } + + public byte[] getLastConsumedKey() { + return lastConsumedKey; + } + + public long getResultsConsumed() { + return resultsConsumed; + } + + public HRegionInfo getRegionInfo() { + return super.location.getRegionInfo(); + } + + public Scan getScan() { + return scan; + } + } + + /** + * The wrapper result returned from a region callable. + */ + protected static class RegionCallableResult { + private RegionCallable regionCallable; + private Result[] results; + + /** + * Constructor. + * @param regionCallable the callable + * @param results the result + */ + public RegionCallableResult(RegionCallable regionCallable, Result[] results) { + this.regionCallable = regionCallable; + this.results = results; + } + + public RegionCallable getRegionCallable() { + return regionCallable; + } + + public Result[] getResults() { + return results; + } + } + + /** + * A specialised exception to indicate that the region has split and that it + * should not be retried. + */ + @SuppressWarnings("serial") + protected static class RegionSplitException extends DoNotRetryIOException { + private transient RegionCallable regionCallable; + private transient NotServingRegionException cause; + + /** + * Constructor. + * @param regionCallable the callable + * @param cause the underlying cause + */ + public RegionSplitException(RegionCallable regionCallable, + NotServingRegionException cause) { + super(); + this.regionCallable = regionCallable; + this.cause = cause; + } + + public RegionCallable getRegionCallable() { + return regionCallable; + } + + public NotServingRegionException getCause() { + return cause; + } + } +} Index: src/test/org/apache/hadoop/hbase/client/TestParallelScannerManager.java =================================================================== --- src/test/org/apache/hadoop/hbase/client/TestParallelScannerManager.java Thu Dec 17 17:48:24 EST 2009 +++ src/test/org/apache/hadoop/hbase/client/TestParallelScannerManager.java Thu Dec 17 17:48:24 EST 2009 @@ -0,0 +1,75 @@ +/* + * Copyright 2009 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import junit.framework.Assert; +import org.apache.hadoop.hbase.HBaseTestCase; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Tests the ParallelScannerManager. + */ +public class TestParallelScannerManager extends HBaseTestCase { + private static int EMPTY = -1; + + /** + * Tests the logic that determines if a scan is interested in a region. + */ + public void testIsScanInterestedInRegion() { + checkPositive(1, 10, 1, 10); + checkPositive(6, 6, 1, 10); + checkPositive(6, 11, 1, 10); + checkNegative(11, 20, 1, 10); + checkNegative(6, 11, 20, 30); + checkPositive(6, 11, EMPTY, 10); + checkNegative(11, 20, EMPTY, 10); + checkPositive(6, 11, 10, EMPTY); + checkNegative(6, 9, 10, EMPTY); + checkPositive(6, 11, EMPTY, EMPTY); + checkPositive(EMPTY, 5, 1, 10); + checkNegative(EMPTY, 5, 6, 10); + checkPositive(EMPTY, EMPTY, 1, 10); + checkPositive(5, EMPTY, 1, 10); + checkNegative(5, EMPTY, 1, 4); + checkPositive(EMPTY, EMPTY, EMPTY, EMPTY); + checkPositive(EMPTY, 1, EMPTY, 10); + checkPositive(EMPTY, 10, EMPTY, 1); + } + + private void checkPositive(int scanStartRow, int scanStopRow, + int regionStartKey, int regionEndKey) { + Scan scan = new Scan(toBytes(scanStartRow), toBytes(scanStopRow)); + boolean result = ParallelScannerManager + .isScanInterestedInRegion(scan, toBytes(regionStartKey), toBytes(regionEndKey)); + Assert.assertTrue("The scan was interested in the region", result); + } + + private void checkNegative(int scanStartRow, int scanStopRow, + int regionStartKey, int regionEndKey) { + Scan scan = new Scan(toBytes(scanStartRow), toBytes(scanStopRow)); + boolean result = ParallelScannerManager + .isScanInterestedInRegion(scan, toBytes(regionStartKey), toBytes(regionEndKey)); + Assert.assertFalse("The scan was NOT interested in the region", result); + } + + private byte[] toBytes(int rowAsInt) { + return rowAsInt == EMPTY ? new byte[0] : Bytes.toBytes(rowAsInt); + } +} Index: src/java/org/apache/hadoop/hbase/client/ParallelHTable.java =================================================================== --- src/java/org/apache/hadoop/hbase/client/ParallelHTable.java Thu Dec 17 17:46:25 EST 2009 +++ src/java/org/apache/hadoop/hbase/client/ParallelHTable.java Thu Dec 17 17:46:25 EST 2009 @@ -0,0 +1,165 @@ +/** + * Copyright 2009 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.ServerCallable; +import org.apache.hadoop.hbase.filter.Filter; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; + +/** + * {@inheritDoc} + *

+ * This extension of the {@link HTable} class provides a mechanism to initiate + * and run scanners for each region in parallel. This is achieved using the a + * {@link ExecutorService} and a {@link ServerCallable} for each region. Each + * callable in responsible for fetching 'hbase.client.scanner.caching'* rows + * from its region per invocation. + *

+ * In order to limit the number of rows pulled into memory on the client the + * region callable is not resubmitted to the {@link ExecutorService} until it's + * previous set of results has been consumed. As a result the order of the rows + * will not necessarily be in key order. This has several side effects; + *

    + *
  • The provided {@link Scan} cannot specify a + * {@link Scan#setStartRow(byte[]) start row} or a + * {@link Scan#setStopRow(byte[]) stop row}. + *
  • The provided {@link Scan#getFilter()} cannot abort the result set + * processing using the {@link Filter#filterAllRemaining()} method. + *
+ *

+ ** If the 'hbase.client.scanner.caching' results in 1 then the value is over + * written with {@link #DEFAULT_SCANNER_CACHING} value. + */ +public class ParallelHTable extends HTable { + /** + * Default scanner caching value. + */ + public static final int DEFAULT_SCANNER_CACHING = 2000; + + private ExecutorService executorService; + + /** + * Constructor. + * + * @param tableName the table name + * @param executorService the executor service + * + * @throws IOException + * if an error occurs + */ + public ParallelHTable(String tableName, ExecutorService executorService) + throws IOException { + super(tableName); + this.executorService = executorService; + } + + /** + * Constructor. + * + * @param tableName the table name + * @param executorService the executor service + * + * @throws IOException if an error occurs + */ + public ParallelHTable(byte[] tableName, ExecutorService executorService) + throws IOException { + super(tableName); + this.executorService = executorService; + } + + /** + * Constructor. + * + * @param conf the config + * @param tableName the table name + * @param executorService the executor service + * + * @throws IOException if an error occurs + */ + public ParallelHTable(HBaseConfiguration conf, String tableName, + ExecutorService executorService) throws IOException { + super(conf, tableName); + this.executorService = executorService; + } + + /** + * Constructor. + * + * @param conf the config + * @param tableName the table name + * @param executorService the executor service + * + * @throws IOException if an error occurs + */ + public ParallelHTable(HBaseConfiguration conf, byte[] tableName, + ExecutorService executorService) throws IOException { + super(conf, tableName); + this.executorService = executorService; + } + + /** + * Get a scanner on the current table as specified by the {@link Scan} object. + * Also note that if the {@link ParallelClientScanner} is used then region + * splits will NOT be handled. An NotServingRegionException will be thrown and + * the query should be re-tried by the client. + * + * @param scan a configured {@link Scan} object + * @param scanInParallel if true multiple thread will be used to perform the scan + * @return scanner + * + * @throws IOException if an error occurs + */ + public ResultScanner getScanner(final Scan scan, boolean scanInParallel) throws IOException { + if (scanInParallel) { + return new ParallelClientScanner(this, scan, defaultScannerCaching()); + } else { + return super.getScanner(scan); + } + } + + /** + * The default scanner caching (pre-fetch count in our code) is set to 1 in + * HTable. That's not really suitable for the parallel scanner, so instead we + * use {@link #DEFAULT_SCANNER_CACHING}. + * + * @return the value of {@link HTable#scannerCaching} if it's not set to 1, + * otherwise {@link #DEFAULT_SCANNER_CACHING} + */ + private int defaultScannerCaching() { + return (super.scannerCaching != 1 ? super.scannerCaching + : DEFAULT_SCANNER_CACHING); + } + + /** + * Returns the {@link java.util.concurrent.ExecutorService} used to process + * the parallel region scans. + * + * @return the executor service + */ + public ExecutorService getExecutorService() { + return executorService; + } +} Index: src/java/org/apache/hadoop/hbase/client/ParallelClientScanner.java =================================================================== --- src/java/org/apache/hadoop/hbase/client/ParallelClientScanner.java Thu Dec 17 17:46:25 EST 2009 +++ src/java/org/apache/hadoop/hbase/client/ParallelClientScanner.java Thu Dec 17 17:46:25 EST 2009 @@ -0,0 +1,164 @@ +/** + * Copyright 2009 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + +/** + * An implementation of the {@link ResultScanner} that co-ordinates with the + * {@link ParallelScannerManager}. + */ +public class ParallelClientScanner implements ResultScanner { + private boolean isClosed = false; + private ParallelScannerManager parallelScannerManager; + private LinkedList preFetchedResults = new LinkedList(); + + /** + * Constructor. + * + * @param table the table name + * @param scan the scan + * @param preFetchCount the number of rows to fetch at once + * + * @throws IOException + * if an error occurs + */ + public ParallelClientScanner(ParallelHTable table, Scan scan, + int preFetchCount) + throws IOException { + // set of the parallel scanner (which will start connecting to regions and + // fetching data) + parallelScannerManager = new ParallelScannerManager(table, scan, + preFetchCount); + } + + @Override + public Result next() throws IOException { + // it's possible that the underlying resources have been closed in a + // previous call to this method + // in that case just return null + if (isClosed) { + return null; + } + + // if there are no pre-fetched results then try and get more + if (preFetchedResults.isEmpty()) { + Result[] results = parallelScannerManager.next(); + if (results != null) { + preFetchedResults.addAll(Arrays.asList(results)); + } + } + + // if there are still no pre-fetched results then we've exhausted all + // regions + // close all the regions and return null + if (preFetchedResults.isEmpty()) { + close(); + return null; + } + + return preFetchedResults.poll(); + } + + @Override + public Result[] next(int nbRows) throws IOException { + List results = new ArrayList(); + while (results.size() < nbRows) { + Result result = next(); + if (result != null) { + results.add(result); + } else { + // all regions are exhausted, nothing more to do... + break; + } + } + + return results.toArray(new Result[results.size()]); + } + + @Override + public void close() { + if (!isClosed) { + isClosed = true; + parallelScannerManager.close(); + } + } + + @Override + public Iterator iterator() { + return new ResultIterator(this); + } + + private static class ResultIterator implements Iterator { + // The next RowResult, possibly pre-read + private Result next = null; + private ResultScanner resultScanner = null; + + ResultIterator(ResultScanner resultScanner) { + this.resultScanner = resultScanner; + } + + // return true if there is another item pending, false if there isn't. + // this method is where the actual advancing takes place, but you need + // to call next() to consume it. hasNext() will only advance if there + // isn't a pending next(). + public boolean hasNext() { + if (next == null) { + try { + next = resultScanner.next(); + return next != null; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + return true; + } + + // get the pending next item and advance the iterator. returns null if + // there is no next item. + public Result next() { + // since hasNext() does the real advancing, we call this to determine + // if there is a next before proceeding. + if (!hasNext()) { + return null; + } + + // if we get to here, then hasNext() has given us an item to return. + // we want to return the item and then null out the next pointer, so + // we use a temporary variable. + Result temp = next; + next = null; + return temp; + } + + public void remove() { + throw new UnsupportedOperationException(); + } + } +}