Index: src/main/java/org/apache/hadoop/hbase/regionserver/PeekChangedException.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/PeekChangedException.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/regionserver/PeekChangedException.java (revision 0) @@ -0,0 +1,22 @@ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; + +/** + * Thrown when storescanner.checkReseek() find storescanner.peek() is changed + * after majorCompaction + */ +public class PeekChangedException extends IOException { + + private static final long serialVersionUID = 4802308828574427604L; + + + public PeekChangedException() { + super(); + } + + public PeekChangedException(String s) { + super(s); + } + +} Index: src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (revision 1227045) +++ src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (working copy) @@ -451,6 +451,14 @@ private void checkReseek() throws IOException { if (this.heap == null && this.lastTop != null) { resetScannerStack(this.lastTop); + if (this.heap.peek() == null + || store.comparator.compare(this.lastTop, this.heap.peek()) != 0) { + LOG.debug("Storescanner.peek() is changed where before = " + + this.lastTop.toString() + ",and after = " + + (this.heap.peek() == null ? "null" : this.heap.peek().toString())); + this.lastTop = null; + throw new PeekChangedException(); + } this.lastTop = null; // gone! } // else dont need to reseek Index: src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java (revision 1179442) +++ src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java (working copy) @@ -20,14 +20,14 @@ package org.apache.hadoop.hbase.regionserver; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValue.KVComparator; - import java.io.IOException; import java.util.Comparator; import java.util.List; import java.util.PriorityQueue; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.KVComparator; + /** * Implements a heap merge across any number of KeyValueScanners. *
@@ -124,7 +124,22 @@ return false; } InternalScanner currentAsInternal = (InternalScanner)this.current; - boolean mayContainsMoreRows = currentAsInternal.next(result, limit); + boolean mayContainsMoreRows; + try { + mayContainsMoreRows = currentAsInternal.next(result, limit); + } catch (PeekChangedException e) { + if (this.current.peek() == null) { + this.current.close(); + } else { + this.heap.add(this.current); + } + this.current = this.heap.poll(); + if (this.current == null) { + return false; + } + currentAsInternal = (InternalScanner) this.current; + mayContainsMoreRows = currentAsInternal.next(result, limit); + } KeyValue pee = this.current.peek(); /* * By definition, any InternalScanner must return false only when it has no