diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java index 6433453..ac76bfd 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java @@ -109,12 +109,14 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner Cell kvNext = this.current.peek(); if (kvNext == null) { this.current.close(); + this.current = null; this.current = pollRealKV(); } else { KeyValueScanner topScanner = this.heap.peek(); // no need to add current back to the heap if it is the only scanner left if (topScanner != null && this.comparator.compare(kvNext, topScanner.peek()) >= 0) { this.heap.add(this.current); + this.current = null; this.current = pollRealKV(); } } @@ -158,6 +160,7 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner } else { this.heap.add(this.current); } + this.current = null; this.current = pollRealKV(); if (this.current == null) { moreCells = scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); @@ -340,7 +343,12 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner while (kvScanner != null && !kvScanner.realSeekDone()) { if (kvScanner.peek() != null) { - kvScanner.enforceSeek(); + try { + kvScanner.enforceSeek(); + } catch (IOException ioe) { + kvScanner.close(); + throw ioe; + } Cell curKV = kvScanner.peek(); if (curKV != null) { KeyValueScanner nextEarliestScanner = heap.peek(); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedKeyValueHeap.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedKeyValueHeap.java index c7ce180..d501313 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedKeyValueHeap.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedKeyValueHeap.java @@ -138,12 +138,14 @@ public class ReversedKeyValueHeap extends KeyValueHeap { } else { this.current.close(); } + this.current = null; this.current = pollRealKV(); } else { KeyValueScanner topScanner = this.heap.peek(); if (topScanner != null && this.comparator.compare(this.current, topScanner) > 0) { this.heap.add(this.current); + this.current = null; this.current = pollRealKV(); } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java index 852a24a..a54a589 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java @@ -210,6 +210,57 @@ public class TestKeyValueHeap extends HBaseTestCase { } } + public void testScannerException() throws IOException { + // Test for NPE issue when exception happens in scanners (HBASE-13835) + + List l1 = new ArrayList(); + l1.add(new KeyValue(row1, fam1, col5, data)); + l1.add(new KeyValue(row2, fam1, col1, data)); + l1.add(new KeyValue(row2, fam1, col2, data)); + SeekScanner s1 = new SeekScanner(l1); + scanners.add(s1); + + List l2 = new ArrayList(); + l2.add(new KeyValue(row1, fam1, col1, data)); + l2.add(new KeyValue(row1, fam1, col2, data)); + SeekScanner s2 = new SeekScanner(l2); + scanners.add(s2); + + List l3 = new ArrayList(); + l3.add(new KeyValue(row1, fam1, col3, data)); + l3.add(new KeyValue(row1, fam1, col4, data)); + l3.add(new KeyValue(row1, fam2, col1, data)); + l3.add(new KeyValue(row1, fam2, col2, data)); + l3.add(new KeyValue(row2, fam1, col3, data)); + SeekScanner s3 = new SeekScanner(l3); + scanners.add(s3); + + List l4 = new ArrayList(); + SeekScanner s4 = new SeekScanner(l4); + scanners.add(s4); + + // Creating KeyValueHeap + KeyValueHeap kvh = new KeyValueHeap(scanners, KeyValue.COMPARATOR); + + try { + for (KeyValueScanner scanner : scanners) { + ((SeekScanner) scanner).setRealSeekDone(false); + } + while (kvh.next() != null); + // The pollRealKV should throw IOE. + assertTrue(false); + } catch (IOException ioe) { + kvh.close(); + } + + // It implies there is no NPE thrown from kvh.close() if getting here + for (KeyValueScanner scanner : scanners) { + // Verify that close is called and only called once for each scanner + assertTrue(((SeekScanner) scanner).isClosed()); + assertEquals(((SeekScanner) scanner).getClosedNum(), 1); + } + } + private static class Scanner extends CollectionBackedScanner { private Iterator iter; private Cell current; @@ -219,6 +270,7 @@ public class TestKeyValueHeap extends HBaseTestCase { super(list); } + @Override public void close(){ closed = true; } @@ -228,6 +280,36 @@ public class TestKeyValueHeap extends HBaseTestCase { } } + private static class SeekScanner extends Scanner { + private int closedNum = 0; + private boolean realSeekDone = true; -} + public SeekScanner(List list) { + super(list); + } + + @Override + public void close() { + super.close(); + closedNum++; + } + + public int getClosedNum() { + return closedNum; + } + + @Override + public boolean realSeekDone() { + return realSeekDone; + } + public void setRealSeekDone(boolean done) { + realSeekDone = done; + } + + @Override + public void enforceSeek() throws IOException { + throw new IOException("enforceSeek must not be called on a " + "non-lazy scanner"); + } + } +}