Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestScanner.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestScanner.java (revision 1227705) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestScanner.java (working copy) @@ -27,7 +27,15 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.HBaseTestCase; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.UnknownScannerException; +import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; @@ -39,7 +47,6 @@ import org.apache.hadoop.hbase.io.hfile.Compression; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Writables; -import org.apache.hadoop.hdfs.MiniDFSCluster; import org.junit.experimental.categories.Category; /** @@ -77,7 +84,24 @@ private HRegion r; private HRegionIncommon region; + + private byte[] firstRowBytes, secondRowBytes, thirdRowBytes; + final private byte[] col1, col2; + public TestScanner() throws Exception { + super(); + + firstRowBytes = START_KEY.getBytes(HConstants.UTF8_ENCODING); + secondRowBytes = START_KEY.getBytes(HConstants.UTF8_ENCODING); + // Increment the least significant character so we get to next row. + secondRowBytes[START_KEY_BYTES.length - 1]++; + thirdRowBytes = START_KEY.getBytes(HConstants.UTF8_ENCODING); + thirdRowBytes[START_KEY_BYTES.length - 1]++; + thirdRowBytes[START_KEY_BYTES.length - 1]++; + col1 = "column1".getBytes(HConstants.UTF8_ENCODING); + col2 = "column2".getBytes(HConstants.UTF8_ENCODING); + } + /** * Test basic stop row filter works. * @throws Exception @@ -466,7 +490,69 @@ } } + /** + * Make sure scanner returns correct result when we run a major compaction + * with deletes. + * + * @throws Exception + */ + @SuppressWarnings("deprecation") + public void testScanAndConcurrentMajorCompact() throws Exception { + HTableDescriptor htd = createTableDescriptor(getName()); + this.r = createNewHRegion(htd, null, null); + HRegionIncommon hri = new HRegionIncommon(r); + try { + addContent(hri, Bytes.toString(fam1), Bytes.toString(col1), + firstRowBytes, secondRowBytes); + addContent(hri, Bytes.toString(fam2), Bytes.toString(col1), + firstRowBytes, secondRowBytes); + + Delete dc = new Delete(firstRowBytes); + /* delete column1 of firstRow */ + dc.deleteColumns(fam1, col1); + r.delete(dc, null, true); + r.flushcache(); + + addContent(hri, Bytes.toString(fam1), Bytes.toString(col1), + secondRowBytes, thirdRowBytes); + addContent(hri, Bytes.toString(fam2), Bytes.toString(col1), + secondRowBytes, thirdRowBytes); + r.flushcache(); + + InternalScanner s = r.getScanner(new Scan()); + // run a major compact, column1 of firstRow will be cleaned. + r.compactStores(true); + + List results = new ArrayList(); + s.next(results); + + // make sure returns column2 of firstRow + assertTrue("result is not correct, keyValues : " + results, + results.size() == 1); + assertTrue(Bytes.BYTES_COMPARATOR.compare(firstRowBytes, results.get(0) + .getRow()) == 0); + assertTrue(Bytes.BYTES_COMPARATOR.compare(fam2, results.get(0) + .getFamily()) == 0); + + results = new ArrayList(); + s.next(results); + + // get secondRow + assertTrue(results.size() == 2); + assertTrue(Bytes.BYTES_COMPARATOR.compare(secondRowBytes, results.get(0) + .getRow()) == 0); + assertTrue(Bytes.BYTES_COMPARATOR.compare(fam1, results.get(0) + .getFamily()) == 0); + assertTrue(Bytes.BYTES_COMPARATOR.compare(fam2, results.get(1) + .getFamily()) == 0); + } finally { + this.r.close(); + this.r.getLog().closeAndDelete(); + } + } + + /* * @param hri Region * @param flushIndex At what row we start the flush. Index: src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java (revision 1227705) +++ src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java (working copy) @@ -20,14 +20,14 @@ package org.apache.hadoop.hbase.regionserver; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValue.KVComparator; - import java.io.IOException; import java.util.Comparator; import java.util.List; import java.util.PriorityQueue; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.KVComparator; + /** * Implements a heap merge across any number of KeyValueScanners. *

@@ -124,7 +124,22 @@ return false; } InternalScanner currentAsInternal = (InternalScanner)this.current; - boolean mayContainsMoreRows = currentAsInternal.next(result, limit); + boolean mayContainMoreRows; + try { + mayContainMoreRows = currentAsInternal.next(result, limit); + } catch (PeekChangedException e) { + if (this.current.peek() == null) { + this.current.close(); + } else { + this.heap.add(this.current); + } + this.current = this.heap.poll(); + if (this.current == null) { + return false; + } + currentAsInternal = (InternalScanner) this.current; + mayContainMoreRows = currentAsInternal.next(result, limit); + } KeyValue pee = this.current.peek(); /* * By definition, any InternalScanner must return false only when it has no @@ -133,7 +148,7 @@ * more efficient to close scanners which are not needed than keep them in * the heap. This is also required for certain optimizations. */ - if (pee == null || !mayContainsMoreRows) { + if (pee == null || !mayContainMoreRows) { this.current.close(); } else { this.heap.add(this.current); Index: src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (revision 1227705) +++ src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (working copy) @@ -451,6 +451,13 @@ private void checkReseek() throws IOException { if (this.heap == null && this.lastTop != null) { resetScannerStack(this.lastTop); + if (this.heap.peek() == null + || store.comparator.compare(this.lastTop, this.heap.peek()) != 0) { + LOG.debug("Storescanner.peek() is changed where before = " + + this.lastTop.toString() + ",and after = " + this.heap.peek()); + this.lastTop = null; + throw new PeekChangedException(); + } this.lastTop = null; // gone! } // else dont need to reseek Index: src/main/java/org/apache/hadoop/hbase/regionserver/PeekChangedException.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/PeekChangedException.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/regionserver/PeekChangedException.java (revision 0) @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; + +/** + * Thrown when StoreScanner.checkReseek() finds StoreScanner.peek() has changed + * after majorCompaction + */ +public class PeekChangedException extends IOException { + + private static final long serialVersionUID = 4802308828574427604L; + + + public PeekChangedException() { + super(); + } + + public PeekChangedException(String s) { + super(s); + } +}