diff --git a/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 07ff021..4ed4cbd 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1171,7 +1171,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ throws IOException { return getScanner(scan, null); } - + protected InternalScanner getScanner(Scan scan, List additionalScanners) throws IOException { newScannerLock.readLock().lock(); try { @@ -1809,16 +1809,17 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ * It is used to combine scanners from multiple Stores (aka column families). */ class RegionScanner implements InternalScanner { - private final KeyValueHeap storeHeap; + private KeyValueHeap storeHeap = null; private final byte [] stopRow; private Filter filter; private RowFilterInterface oldFilter; private List results = new ArrayList(); - private int isScan; + private Scan theScan = null; + private int isScan; + private List extraScanners = null; RegionScanner(Scan scan, List additionalScanners) { - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); -// DebugPrint.println("HRegionScanner., threadpoint = " + ReadWriteConsistencyControl.getThreadReadPoint()); + //DebugPrint.println("HRegionScanner."); this.filter = scan.getFilter(); this.oldFilter = scan.getOldFilter(); @@ -1828,23 +1829,29 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ this.stopRow = scan.getStopRow(); } this.isScan = scan.isGetScan() ? -1 : 0; - + this.theScan = scan; + this.extraScanners = additionalScanners; + } + + RegionScanner(Scan scan) { + this(scan, null); + } + + void initHeap() { List scanners = new ArrayList(); - if (additionalScanners != null) { - scanners.addAll(additionalScanners); + if (extraScanners != null) { + scanners.addAll(extraScanners); } + for (Map.Entry> entry : - scan.getFamilyMap().entrySet()) { + theScan.getFamilyMap().entrySet()) { Store store = stores.get(entry.getKey()); - scanners.add(store.getScanner(scan, entry.getValue())); + scanners.add(store.getScanner(theScan, entry.getValue())); } - this.storeHeap = + this.storeHeap = new KeyValueHeap(scanners.toArray(new KeyValueScanner[0]), comparator); } - - RegionScanner(Scan scan) { - this(scan, null); - } + /** * Reset both the filter and the old filter. @@ -1870,6 +1877,11 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ // This could be a new thread from the last time we called next(). ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + // lazy init the store heap. + if (storeHeap == null) { + initHeap(); + } + results.clear(); boolean returnResult = nextInternal(); if (!returnResult && filterRow()) { diff --git a/src/test/org/apache/hadoop/hbase/HBaseTestCase.java b/src/test/org/apache/hadoop/hbase/HBaseTestCase.java index dc45154..3b0cd0c 100644 --- a/src/test/org/apache/hadoop/hbase/HBaseTestCase.java +++ b/src/test/org/apache/hadoop/hbase/HBaseTestCase.java @@ -666,4 +666,14 @@ public abstract class HBaseTestCase extends TestCase { Bytes.toString(actual) + ">"); } } + + public static void assertEquals(byte[] expected, + byte[] actual) { + if (Bytes.compareTo(expected, actual) != 0) { + throw new AssertionFailedError("expected:<" + + Bytes.toStringBinary(expected) + "> but was:<" + + Bytes.toStringBinary(actual) + ">"); + } + } + } diff --git a/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 793fa94..5e1a198 100644 --- a/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -992,6 +992,7 @@ public class TestHRegion extends HBaseTestCase { String method = this.getName(); initHRegion(tableName, method, families); + //Putting data in Region Put put = new Put(row1); put.add(fam1, null, null); @@ -1001,18 +1002,20 @@ public class TestHRegion extends HBaseTestCase { region.put(put); Scan scan = null; - InternalScanner is = null; + HRegion.RegionScanner is = null; //Testing to see how many scanners that is produced by getScanner, starting //with known number, 2 - current = 1 scan = new Scan(); scan.addFamily(fam2); scan.addFamily(fam4); - is = region.getScanner(scan); + is = (RegionScanner) region.getScanner(scan); + is.initHeap(); // i dont like this test assertEquals(1, ((RegionScanner)is).getStoreHeap().getHeap().size()); scan = new Scan(); - is = region.getScanner(scan); + is = (RegionScanner) region.getScanner(scan); + is.initHeap(); assertEquals(families.length -1, ((RegionScanner)is).getStoreHeap().getHeap().size()); } @@ -2147,6 +2150,15 @@ public class TestHRegion extends HBaseTestCase { result.getCellValue(families[0], qualifiers[0]).getTimestamp(); Assert.assertTrue(timestamp >= prevTimestamp); prevTimestamp = timestamp; + + byte [] gotValue = null; + for (KeyValue kv : result.raw()) { + byte [] thisValue = kv.getValue(); + if (gotValue != null) { + assertEquals(gotValue, thisValue); + } + gotValue = thisValue; + } } } diff --git a/src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java b/src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java index 28a01a2..b45eb8a 100644 --- a/src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java +++ b/src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java @@ -297,6 +297,7 @@ public class TestMemStore extends TestCase { rwcc.completeMemstoreInsert(w); // Assert that we can read back + ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); KeyValueScanner s = this.memstore.getScanners()[0]; s.seek(kv); @@ -309,7 +310,7 @@ public class TestMemStore extends TestCase { } } - public void no_testReadOwnWritesUnderConcurrency() throws Throwable { + public void testReadOwnWritesUnderConcurrency() throws Throwable { int NUM_THREADS = 8;