Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestScanner.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestScanner.java (revision 1228762) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestScanner.java (working copy) @@ -27,7 +27,15 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.HBaseTestCase; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.UnknownScannerException; +import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; @@ -39,7 +47,6 @@ import org.apache.hadoop.hbase.io.hfile.Compression; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Writables; -import org.apache.hadoop.hdfs.MiniDFSCluster; import org.junit.experimental.categories.Category; /** @@ -77,7 +84,24 @@ private HRegion r; private HRegionIncommon region; + + private byte[] firstRowBytes, secondRowBytes, thirdRowBytes; + final private byte[] col1, col2; + public TestScanner() throws Exception { + super(); + + firstRowBytes = START_KEY.getBytes(HConstants.UTF8_ENCODING); + secondRowBytes = START_KEY.getBytes(HConstants.UTF8_ENCODING); + // Increment the least significant character so we get to next row. + secondRowBytes[START_KEY_BYTES.length - 1]++; + thirdRowBytes = START_KEY.getBytes(HConstants.UTF8_ENCODING); + thirdRowBytes[START_KEY_BYTES.length - 1]++; + thirdRowBytes[START_KEY_BYTES.length - 1]++; + col1 = "column1".getBytes(HConstants.UTF8_ENCODING); + col2 = "column2".getBytes(HConstants.UTF8_ENCODING); + } + /** * Test basic stop row filter works. * @throws Exception @@ -466,7 +490,69 @@ } } + /** + * Make sure scanner returns correct result when we run a major compaction + * with deletes. + * + * @throws Exception + */ + @SuppressWarnings("deprecation") + public void testScanAndConcurrentMajorCompact() throws Exception { + HTableDescriptor htd = createTableDescriptor(getName()); + this.r = createNewHRegion(htd, null, null); + HRegionIncommon hri = new HRegionIncommon(r); + try { + addContent(hri, Bytes.toString(fam1), Bytes.toString(col1), + firstRowBytes, secondRowBytes); + addContent(hri, Bytes.toString(fam2), Bytes.toString(col1), + firstRowBytes, secondRowBytes); + + Delete dc = new Delete(firstRowBytes); + /* delete column1 of firstRow */ + dc.deleteColumns(fam1, col1); + r.delete(dc, null, true); + r.flushcache(); + + addContent(hri, Bytes.toString(fam1), Bytes.toString(col1), + secondRowBytes, thirdRowBytes); + addContent(hri, Bytes.toString(fam2), Bytes.toString(col1), + secondRowBytes, thirdRowBytes); + r.flushcache(); + + InternalScanner s = r.getScanner(new Scan()); + // run a major compact, column1 of firstRow will be cleaned. + r.compactStores(true); + + List results = new ArrayList(); + s.next(results); + + // make sure returns column2 of firstRow + assertTrue("result is not correct, keyValues : " + results, + results.size() == 1); + assertTrue(Bytes.BYTES_COMPARATOR.compare(firstRowBytes, results.get(0) + .getRow()) == 0); + assertTrue(Bytes.BYTES_COMPARATOR.compare(fam2, results.get(0) + .getFamily()) == 0); + + results = new ArrayList(); + s.next(results); + + // get secondRow + assertTrue(results.size() == 2); + assertTrue(Bytes.BYTES_COMPARATOR.compare(secondRowBytes, results.get(0) + .getRow()) == 0); + assertTrue(Bytes.BYTES_COMPARATOR.compare(fam1, results.get(0) + .getFamily()) == 0); + assertTrue(Bytes.BYTES_COMPARATOR.compare(fam2, results.get(1) + .getFamily()) == 0); + } finally { + this.r.close(); + this.r.getLog().closeAndDelete(); + } + } + + /* * @param hri Region * @param flushIndex At what row we start the flush. Index: src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java (revision 1228762) +++ src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java (working copy) @@ -20,14 +20,14 @@ package org.apache.hadoop.hbase.regionserver; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValue.KVComparator; - import java.io.IOException; import java.util.Comparator; import java.util.List; import java.util.PriorityQueue; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.KVComparator; + /** * Implements a heap merge across any number of KeyValueScanners. *

@@ -124,7 +124,7 @@ return false; } InternalScanner currentAsInternal = (InternalScanner)this.current; - boolean mayContainsMoreRows = currentAsInternal.next(result, limit); + boolean mayContainMoreRows = currentAsInternal.next(result, limit); KeyValue pee = this.current.peek(); /* * By definition, any InternalScanner must return false only when it has no @@ -133,7 +133,7 @@ * more efficient to close scanners which are not needed than keep them in * the heap. This is also required for certain optimizations. */ - if (pee == null || !mayContainsMoreRows) { + if (pee == null || !mayContainMoreRows) { this.current.close(); } else { this.heap.add(this.current); Index: src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (revision 1228762) +++ src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (working copy) @@ -298,7 +298,9 @@ @Override public synchronized boolean next(List outResult, int limit) throws IOException { - checkReseek(); + if (checkReseek()) { + return true; + } // if the heap was left null, then the scanners had previously run out anyways, close and // return. @@ -448,12 +450,25 @@ // Let the next() call handle re-creating and seeking } - private void checkReseek() throws IOException { + /** + * @return true if top of heap has changed (and KeyValueHeap has to try the + * next KV) + * @throws IOException + */ + private boolean checkReseek() throws IOException { if (this.heap == null && this.lastTop != null) { resetScannerStack(this.lastTop); + if (this.heap.peek() == null + || store.comparator.compare(this.lastTop, this.heap.peek()) != 0) { + LOG.debug("Storescanner.peek() is changed where before = " + + this.lastTop.toString() + ",and after = " + this.heap.peek()); + this.lastTop = null; + return true; + } this.lastTop = null; // gone! } // else dont need to reseek + return false; } private void resetScannerStack(KeyValue lastTopKey) throws IOException {