From 89605b6d0f5606ef2189b52e85efb43dae910bc9 Mon Sep 17 00:00:00 2001 From: Apekshit Date: Tue, 23 Feb 2016 00:31:18 -0800 Subject: [PATCH] HBASE-15236 Inconsistent cell reads over multiple bulk-loaded HFiles. In KeyValueHeap, instead of using just seq id to determine priority order in case two Cells are same, we should order determined by StoreFile.Comparater.SEQ_ID. Also changes the name in KeyValueScanner from getSequenceId() to getPriorityId() because that is what it means now (and always meant in SegmentScanner). Testing: Adds unit test to TestKeyValueHeap. Manual testing: Three cases (Tables t, t2, t3 in the jira description), single region, 2 hfiles with same seq id, timestamps and duplicate KVs. Made sure that returned kv was same for get and scan. (Apekshit) --- .../hadoop/hbase/regionserver/KeyValueHeap.java | 19 +- .../hadoop/hbase/regionserver/KeyValueScanner.java | 10 +- .../hadoop/hbase/regionserver/MemStoreScanner.java | 5 +- .../hadoop/hbase/regionserver/SegmentScanner.java | 23 +- .../hadoop/hbase/regionserver/StoreFile.java | 46 ++-- .../hbase/regionserver/StoreFileScanner.java | 29 ++- .../hadoop/hbase/regionserver/StoreScanner.java | 5 +- .../hadoop/hbase/util/CollectionBackedScanner.java | 5 +- .../hbase/regionserver/TestKeyValueHeap.java | 269 ++++++++------------- .../hadoop/hbase/regionserver/TestStoreFile.java | 2 +- .../regionserver/compactions/TestCompactor.java | 2 +- .../compactions/TestStripeCompactionPolicy.java | 3 +- 12 files changed, 190 insertions(+), 228 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java index 89fc8fb..009a30b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java @@ -189,17 +189,9 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner if (comparison != 0) { return comparison; } else { - // Since both the keys are exactly the same, we break the tie in favor - // of the key which came latest. - long leftSequenceID = left.getSequenceID(); - long rightSequenceID = right.getSequenceID(); - if (leftSequenceID > rightSequenceID) { - return -1; - } else if (leftSequenceID < rightSequenceID) { - return 1; - } else { - return 0; - } + // Since both the keys are exactly the same, we break the tie in favor of higher priority + // (higher value). Since higher value (priority) should come first, we reverse sort here. + return Long.compare(right.getPriorityId(), left.getPriorityId()); } } /** @@ -406,8 +398,11 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner return this.heap; } + /** + * @see KeyValueScanner#getPriorityId() + */ @Override - public long getSequenceID() { + public long getPriorityId() { return 0; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java index ed86a83..44ebd2f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java @@ -70,13 +70,13 @@ public interface KeyValueScanner extends Shipper, Closeable { boolean reseek(Cell key) throws IOException; /** - * Get the sequence id associated with this KeyValueScanner. This is required - * for comparing multiple files to find out which one has the latest data. - * The default implementation for this would be to return 0. A file having - * lower sequence id will be considered to be the older one. + * Get the priority associated with this KeyValueScanner. Larger the value, higher the priority. + * This is required for comparing multiple scanners to find out which one has the latest data. + * The default implementation for this would be to return 0. KeyValue from a scanner with higher + * priority (higher value) will be used first. */ // TODO: Implement SequenceId Interface instead. - long getSequenceID(); + long getPriorityId(); /** * Close the KeyValue scanner. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java index dfcec25..4fbc19d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java @@ -186,11 +186,12 @@ public class MemStoreScanner extends NonLazyKeyValueScanner { } /** - * MemStoreScanner returns max value as sequence id because it will + * MemStoreScanner returns Long.MAX_VALUE (highest priority) as priority id because it will * always have the latest data among all files. + * @see KeyValueScanner#getPriorityId() */ @Override - public synchronized long getSequenceID() { + public synchronized long getPriorityId() { return Long.MAX_VALUE; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java index b5aabb8..9608da9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java @@ -33,7 +33,9 @@ import org.apache.hadoop.hbase.client.Scan; @InterfaceAudience.Private public class SegmentScanner implements KeyValueScanner { - private long sequenceID = Long.MAX_VALUE; + // Priority of this scanner relative to other scanners. Higher value means higher priority and + // Long.MAX_VALUE is the highest priority. + private long priorityId = Long.MAX_VALUE; // the observed structure private final Segment segment; @@ -208,14 +210,11 @@ public class SegmentScanner implements KeyValueScanner { } /** - * Get the sequence id associated with this KeyValueScanner. This is required - * for comparing multiple files (or memstore segments) scanners to find out - * which one has the latest data. - * + * @see KeyValueScanner#getPriorityId() */ @Override - public long getSequenceID() { - return sequenceID; + public long getPriorityId() { + return priorityId; } /** @@ -297,12 +296,12 @@ public class SegmentScanner implements KeyValueScanner { } /** - * Set the sequence id of the scanner. + * Set the priority id of the scanner. Higher value has higher priority. * This is used to determine an order between memory segment scanners. - * @param x a unique sequence id + * @param x a unique priority id */ - public void setSequenceID(long x) { - sequenceID = x; + public void setPriorityId(long x) { + priorityId = x; } /** @@ -321,7 +320,7 @@ public class SegmentScanner implements KeyValueScanner { @Override public String toString() { String res = "Store segment scanner of type "+this.getClass().getName()+"; "; - res += "sequence id "+getSequenceID()+"; "; + res += "priority id " + getPriorityId() + "; "; res += getSegment().toString(); return res; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index 868bee0..709c6b5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -258,6 +258,7 @@ public class StoreFile { this.fileInfo = other.fileInfo; this.cacheConf = other.cacheConf; this.cfBloomType = other.cfBloomType; + this.metadataMap = other.metadataMap; } /** @@ -391,7 +392,7 @@ public class StoreFile { if (startPos != -1) { bulkLoadedHFile = true; } - return bulkLoadedHFile || metadataMap.containsKey(BULKLOAD_TIME_KEY); + return bulkLoadedHFile || (metadataMap != null && metadataMap.containsKey(BULKLOAD_TIME_KEY)); } @VisibleForTesting @@ -1222,19 +1223,27 @@ public class StoreFile { } /** - * Get a scanner to scan over this StoreFile. Do not use - * this overload if using this scanner for compactions. + * Uses {@link #getStoreFileScanner(boolean, boolean, boolean, long, long)} by setting + * {@code isCompaction} to false, {@code readPt} to 0 and {@code priority} to 0. + * Do not use this overload if using this scanner for compactions. * - * @param cacheBlocks should this scanner cache blocks? - * @param pread use pread (for highly concurrent small readers) - * @return a scanner + * @see #getStoreFileScanner(boolean, boolean, boolean, long, long) + */ + public StoreFileScanner getStoreFileScanner(boolean cacheBlocks, boolean pread) { + // 0 is passed as readpoint because this method is only used by test + // where StoreFile is directly operated upon + return getStoreFileScanner(cacheBlocks, pread, false, 0, 0); + } + + /** + * Uses {@link #getStoreFileScanner(boolean, boolean, boolean, long, long)} by setting + * {@code priority} to 0. + * + * @see #getStoreFileScanner(boolean, boolean, boolean, long, long) */ - public StoreFileScanner getStoreFileScanner(boolean cacheBlocks, - boolean pread) { - return getStoreFileScanner(cacheBlocks, pread, false, - // 0 is passed as readpoint because this method is only used by test - // where StoreFile is directly operated upon - 0); + public StoreFileScanner getStoreFileScanner( + boolean cacheBlocks, boolean pread, boolean isCompaction, long readPt) { + return getStoreFileScanner(cacheBlocks, pread, isCompaction, readPt, 0); } /** @@ -1243,16 +1252,17 @@ public class StoreFile { * @param cacheBlocks should this scanner cache blocks? * @param pread use pread (for highly concurrent small readers) * @param isCompaction is scanner being used for compaction? + * @param priority Priority of this scanner relative to other scanners, when same keyvalue is + * present in both. Higher values means higher priority. * @return a scanner */ - public StoreFileScanner getStoreFileScanner(boolean cacheBlocks, - boolean pread, - boolean isCompaction, long readPt) { + public StoreFileScanner getStoreFileScanner( + boolean cacheBlocks, boolean pread, boolean isCompaction, long readPt, long priority) { // Increment the ref count refCount.incrementAndGet(); - return new StoreFileScanner(this, - getScanner(cacheBlocks, pread, isCompaction), - !isCompaction, reader.hasMVCCInfo(), readPt); + return new StoreFileScanner( + this, getScanner(cacheBlocks, pread, isCompaction), !isCompaction, reader.hasMVCCInfo(), + readPt, priority); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java index 924e7f6..d13ad62 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java @@ -23,6 +23,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -66,17 +67,25 @@ public class StoreFileScanner implements KeyValueScanner { private long readPt; + // Priority of this scanner relative to other scanners when duplicate key-value is found. + // Higher values means higher priority. + private long priority; + /** * Implements a {@link KeyValueScanner} on top of the specified {@link HFileScanner} - * @param hfs HFile scanner + * @param useMVCC If true, scanner will filter out updates with MVCC larger than {@code readPt}. + * @param readPt MVCC value to use to filter out the updates newer than this scanner. + * @param hasMVCC Set to true if underlying store file reader has MVCC info. + * @param priority Priority of the scanner. See {@link KeyValueScanner#getPriorityId()} */ public StoreFileScanner(StoreFile.Reader reader, HFileScanner hfs, boolean useMVCC, - boolean hasMVCC, long readPt) { + boolean hasMVCC, long readPt, long priority) { this.readPt = readPt; this.reader = reader; this.hfs = hfs; this.enforceMVCC = useMVCC; this.hasMVCCInfo = hasMVCC; + this.priority = priority; } boolean isPrimaryReplica() { @@ -116,11 +125,14 @@ public class StoreFileScanner implements KeyValueScanner { ScanQueryMatcher matcher, long readPt, boolean isPrimaryReplica) throws IOException { List scanners = new ArrayList( files.size()); - for (StoreFile file : files) { - StoreFile.Reader r = file.createReader(canUseDrop); + + List sorted_files = new ArrayList<>(files); + Collections.sort(sorted_files, StoreFile.Comparators.SEQ_ID); + for (int i = 0; i < sorted_files.size(); i++) { + Reader r = sorted_files.get(i).createReader(); r.setReplicaStoreFile(isPrimaryReplica); StoreFileScanner scanner = r.getStoreFileScanner(cacheBlocks, usePread, - isCompaction, readPt); + isCompaction, readPt, i); scanner.setScanQueryMatcher(matcher); scanners.add(scanner); } @@ -304,9 +316,12 @@ public class StoreFileScanner implements KeyValueScanner { return s.next(); } + /** + * @see KeyValueScanner#getPriorityId() + */ @Override - public long getSequenceID() { - return reader.getSequenceID(); + public long getPriorityId() { + return priority; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index ecae787..9f0e8a6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -887,8 +887,11 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner return false; } + /** + * @see KeyValueScanner#getPriorityId() + */ @Override - public long getSequenceID() { + public long getPriorityId() { return 0; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java index 4720880..c7954de 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java @@ -116,8 +116,11 @@ public class CollectionBackedScanner extends NonReversedNonLazyKeyValueScanner { return false; } + /** + * @see org.apache.hadoop.hbase.regionserver.KeyValueScanner#getPriorityId() + */ @Override - public long getSequenceID() { + public long getPriorityId() { return 0; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java index aff40c1..4a7ec18 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java @@ -21,7 +21,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.util.ArrayList; -import java.util.Iterator; +import java.util.Arrays; import java.util.List; import org.apache.hadoop.hbase.Cell; @@ -38,178 +38,105 @@ import org.junit.experimental.categories.Category; @Category({RegionServerTests.class, SmallTests.class}) public class TestKeyValueHeap extends HBaseTestCase { - private static final boolean PRINT = false; - - List scanners = new ArrayList(); - - private byte[] row1; - private byte[] fam1; - private byte[] col1; - private byte[] data; + private byte[] row1 = Bytes.toBytes("row1"); + private byte[] fam1 = Bytes.toBytes("fam1"); + private byte[] col1 = Bytes.toBytes("col1"); + private byte[] data = Bytes.toBytes("data"); + + private byte[] row2 = Bytes.toBytes("row2"); + private byte[] fam2 = Bytes.toBytes("fam2"); + private byte[] col2 = Bytes.toBytes("col2"); + + private byte[] col3 = Bytes.toBytes("col3"); + private byte[] col4 = Bytes.toBytes("col4"); + private byte[] col5 = Bytes.toBytes("col5"); + + // Variable name encoding. kv + Cell kv111 = new KeyValue(row1, fam1, col1, data); + Cell kv112 = new KeyValue(row1, fam1, col2, data); + Cell kv113 = new KeyValue(row1, fam1, col3, data); + Cell kv114 = new KeyValue(row1, fam1, col4, data); + Cell kv115 = new KeyValue(row1, fam1, col5, data); + Cell kv121 = new KeyValue(row1, fam2, col1, data); + Cell kv122 = new KeyValue(row1, fam2, col2, data); + Cell kv211 = new KeyValue(row2, fam1, col1, data); + Cell kv212 = new KeyValue(row2, fam1, col2, data); + Cell kv213 = new KeyValue(row2, fam1, col3, data); + + TestScanner s1 = new TestScanner(Arrays.asList(kv115, kv211, kv212)); + TestScanner s2 = new TestScanner(Arrays.asList(kv111, kv112)); + TestScanner s3 = new TestScanner(Arrays.asList(kv113, kv114, kv121, kv122, kv213)); + + List scanners = new ArrayList(Arrays.asList(s1, s2, s3)); + + /* + * Uses {@code scanners} to build a KeyValueHeap, iterates over it and asserts that returned + * Cells are same as {@code expected}. + * @return List of Cells returned from scanners. + */ + public List assertCells(List expected, List scanners) + throws IOException { + //Creating KeyValueHeap + KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparator.COMPARATOR); - private byte[] row2; - private byte[] fam2; - private byte[] col2; + List actual = new ArrayList<>(); + while(kvh.peek() != null){ + actual.add(kvh.next()); + } - private byte[] col3; - private byte[] col4; - private byte[] col5; + assertEquals(expected, actual); + return actual; + } @Before public void setUp() throws Exception { super.setUp(); - data = Bytes.toBytes("data"); - row1 = Bytes.toBytes("row1"); - fam1 = Bytes.toBytes("fam1"); - col1 = Bytes.toBytes("col1"); - row2 = Bytes.toBytes("row2"); - fam2 = Bytes.toBytes("fam2"); - col2 = Bytes.toBytes("col2"); - col3 = Bytes.toBytes("col3"); - col4 = Bytes.toBytes("col4"); - col5 = Bytes.toBytes("col5"); } @Test public void testSorted() throws IOException{ //Cases that need to be checked are: - //1. The "smallest" KeyValue is in the same scanners as current + //1. The "smallest" Cell is in the same scanners as current //2. Current scanner gets empty - List l1 = new ArrayList(); - l1.add(new KeyValue(row1, fam1, col5, data)); - l1.add(new KeyValue(row2, fam1, col1, data)); - l1.add(new KeyValue(row2, fam1, col2, data)); - scanners.add(new Scanner(l1)); - - List l2 = new ArrayList(); - l2.add(new KeyValue(row1, fam1, col1, data)); - l2.add(new KeyValue(row1, fam1, col2, data)); - scanners.add(new Scanner(l2)); - - List l3 = new ArrayList(); - l3.add(new KeyValue(row1, fam1, col3, data)); - l3.add(new KeyValue(row1, fam1, col4, data)); - l3.add(new KeyValue(row1, fam2, col1, data)); - l3.add(new KeyValue(row1, fam2, col2, data)); - l3.add(new KeyValue(row2, fam1, col3, data)); - scanners.add(new Scanner(l3)); - - List expected = new ArrayList(); - expected.add(new KeyValue(row1, fam1, col1, data)); - expected.add(new KeyValue(row1, fam1, col2, data)); - expected.add(new KeyValue(row1, fam1, col3, data)); - expected.add(new KeyValue(row1, fam1, col4, data)); - expected.add(new KeyValue(row1, fam1, col5, data)); - expected.add(new KeyValue(row1, fam2, col1, data)); - expected.add(new KeyValue(row1, fam2, col2, data)); - expected.add(new KeyValue(row2, fam1, col1, data)); - expected.add(new KeyValue(row2, fam1, col2, data)); - expected.add(new KeyValue(row2, fam1, col3, data)); + List expected = Arrays.asList( + kv111, kv112, kv113, kv114, kv115, kv121, kv122, kv211, kv212, kv213); - //Creating KeyValueHeap - KeyValueHeap kvh = - new KeyValueHeap(scanners, CellComparator.COMPARATOR); - - List actual = new ArrayList(); - while(kvh.peek() != null){ - actual.add(kvh.next()); - } - - assertEquals(expected.size(), actual.size()); - for(int i=0; i actual = assertCells(expected, scanners); //Check if result is sorted according to Comparator for(int i=0; i l1 = new ArrayList(); - l1.add(new KeyValue(row1, fam1, col5, data)); - l1.add(new KeyValue(row2, fam1, col1, data)); - l1.add(new KeyValue(row2, fam1, col2, data)); - scanners.add(new Scanner(l1)); - - List l2 = new ArrayList(); - l2.add(new KeyValue(row1, fam1, col1, data)); - l2.add(new KeyValue(row1, fam1, col2, data)); - scanners.add(new Scanner(l2)); - - List l3 = new ArrayList(); - l3.add(new KeyValue(row1, fam1, col3, data)); - l3.add(new KeyValue(row1, fam1, col4, data)); - l3.add(new KeyValue(row1, fam2, col1, data)); - l3.add(new KeyValue(row1, fam2, col2, data)); - l3.add(new KeyValue(row2, fam1, col3, data)); - scanners.add(new Scanner(l3)); - - List expected = new ArrayList(); - expected.add(new KeyValue(row2, fam1, col1, data)); + List expected = Arrays.asList(kv211); //Creating KeyValueHeap KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparator.COMPARATOR); - KeyValue seekKv = new KeyValue(row2, fam1, null, null); + Cell seekKv = new KeyValue(row2, fam1, null, null); kvh.seek(seekKv); - List actual = new ArrayList(); - actual.add(kvh.peek()); - - assertEquals(expected.size(), actual.size()); - for(int i=0; i actual = Arrays.asList(kvh.peek()); + assertEquals("Expected = " + Arrays.toString(expected.toArray()) + + "\n Actual = " + Arrays.toString(actual.toArray()), expected, actual); } @Test public void testScannerLeak() throws IOException { // Test for unclosed scanners (HBASE-1927) - List l1 = new ArrayList(); - l1.add(new KeyValue(row1, fam1, col5, data)); - l1.add(new KeyValue(row2, fam1, col1, data)); - l1.add(new KeyValue(row2, fam1, col2, data)); - Scanner s1 = new Scanner(l1); - scanners.add(s1); - - List l2 = new ArrayList(); - l2.add(new KeyValue(row1, fam1, col1, data)); - l2.add(new KeyValue(row1, fam1, col2, data)); - Scanner s2 = new Scanner(l2); - scanners.add(s2); - - List l3 = new ArrayList(); - l3.add(new KeyValue(row1, fam1, col3, data)); - l3.add(new KeyValue(row1, fam1, col4, data)); - l3.add(new KeyValue(row1, fam2, col1, data)); - l3.add(new KeyValue(row1, fam2, col2, data)); - l3.add(new KeyValue(row2, fam1, col3, data)); - Scanner s3 = new Scanner(l3); - scanners.add(s3); - - List l4 = new ArrayList(); - Scanner s4 = new Scanner(l4); + TestScanner s4 = new TestScanner(new ArrayList()); scanners.add(s4); //Creating KeyValueHeap @@ -225,7 +152,7 @@ public class TestKeyValueHeap extends HBaseTestCase { assertTrue(kvh.scannersForDelayedClose.contains(s4)); kvh.close(); for(KeyValueScanner scanner : scanners) { - assertTrue(((Scanner)scanner).isClosed()); + assertTrue(((TestScanner)scanner).isClosed()); } } @@ -233,38 +160,19 @@ public class TestKeyValueHeap extends HBaseTestCase { public void testScannerException() throws IOException { // Test for NPE issue when exception happens in scanners (HBASE-13835) - List l1 = new ArrayList(); - l1.add(new KeyValue(row1, fam1, col5, data)); - l1.add(new KeyValue(row2, fam1, col1, data)); - l1.add(new KeyValue(row2, fam1, col2, data)); - SeekScanner s1 = new SeekScanner(l1); - scanners.add(s1); - - List l2 = new ArrayList(); - l2.add(new KeyValue(row1, fam1, col1, data)); - l2.add(new KeyValue(row1, fam1, col2, data)); - SeekScanner s2 = new SeekScanner(l2); - scanners.add(s2); - - List l3 = new ArrayList(); - l3.add(new KeyValue(row1, fam1, col3, data)); - l3.add(new KeyValue(row1, fam1, col4, data)); - l3.add(new KeyValue(row1, fam2, col1, data)); - l3.add(new KeyValue(row1, fam2, col2, data)); - l3.add(new KeyValue(row2, fam1, col3, data)); - SeekScanner s3 = new SeekScanner(l3); - scanners.add(s3); - - List l4 = new ArrayList(); - SeekScanner s4 = new SeekScanner(l4); - scanners.add(s4); + TestScanner s1 = new SeekTestScanner(Arrays.asList(kv115, kv211, kv212)); + TestScanner s2 = new SeekTestScanner(Arrays.asList(kv111, kv112)); + TestScanner s3 = new SeekTestScanner(Arrays.asList(kv113, kv114, kv121, kv122, kv213)); + TestScanner s4 = new SeekTestScanner(new ArrayList()); + + List scanners = new ArrayList(Arrays.asList(s1, s2, s3, s4)); // Creating KeyValueHeap KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparator.COMPARATOR); try { for (KeyValueScanner scanner : scanners) { - ((SeekScanner) scanner).setRealSeekDone(false); + ((SeekTestScanner) scanner).setRealSeekDone(false); } while (kvh.next() != null); // The pollRealKV should throw IOE. @@ -276,20 +184,47 @@ public class TestKeyValueHeap extends HBaseTestCase { // It implies there is no NPE thrown from kvh.close() if getting here for (KeyValueScanner scanner : scanners) { // Verify that close is called and only called once for each scanner - assertTrue(((SeekScanner) scanner).isClosed()); - assertEquals(((SeekScanner) scanner).getClosedNum(), 1); + assertTrue(((SeekTestScanner) scanner).isClosed()); + assertEquals(((SeekTestScanner) scanner).getClosedNum(), 1); + } + } + + @Test + public void testPriorityId() throws IOException { + Cell kv113A = new KeyValue(row1, fam1, col3, Bytes.toBytes("aaa")); + Cell kv113B = new KeyValue(row1, fam1, col3, Bytes.toBytes("bbb")); + { + TestScanner scan1 = new TestScanner(Arrays.asList(kv111, kv112, kv113A), 1); + TestScanner scan2 = new TestScanner(Arrays.asList(kv113B), 2); + List expected = Arrays.asList(kv111, kv112, kv113B, kv113A); + assertCells(expected, new ArrayList(Arrays.asList(scan1, scan2))); + } + { + TestScanner scan1 = new TestScanner(Arrays.asList(kv111, kv112, kv113A), 2); + TestScanner scan2 = new TestScanner(Arrays.asList(kv113B), 1); + List expected = Arrays.asList(kv111, kv112, kv113A, kv113B); + assertCells(expected, new ArrayList(Arrays.asList(scan1, scan2))); } } - private static class Scanner extends CollectionBackedScanner { - private Iterator iter; - private Cell current; + private static class TestScanner extends CollectionBackedScanner { private boolean closed = false; + private long priority = 0; - public Scanner(List list) { + public TestScanner(List list) { super(list); } + public TestScanner(List list, long priority) { + this(list); + this.priority = priority; + } + + @Override + public long getPriorityId() { + return priority; + } + @Override public void close(){ closed = true; @@ -300,11 +235,11 @@ public class TestKeyValueHeap extends HBaseTestCase { } } - private static class SeekScanner extends Scanner { + private static class SeekTestScanner extends TestScanner { private int closedNum = 0; private boolean realSeekDone = true; - public SeekScanner(List list) { + public SeekTestScanner(List list) { super(list); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java index 829aa73..a9215d6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java @@ -211,7 +211,7 @@ public class TestStoreFile extends HBaseTestCase { when(hcd.getName()).thenReturn(cf); when(store.getFamily()).thenReturn(hcd); StoreFileScanner scanner = - new StoreFileScanner(reader, mock(HFileScanner.class), false, false, 0); + new StoreFileScanner(reader, mock(HFileScanner.class), false, false, 0, 0); Scan scan = new Scan(); scan.setColumnFamilyTimeRange(cf, 0, 1); assertFalse(scanner.shouldUseScanner(scan, store, 0)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java index 6ec4cd4..328f7d9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java @@ -62,7 +62,7 @@ public class TestCompactor { when(r.length()).thenReturn(1L); when(r.getBloomFilterType()).thenReturn(BloomType.NONE); when(r.getHFileReader()).thenReturn(mock(HFile.Reader.class)); - when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong())) + when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyLong())) .thenReturn(mock(StoreFileScanner.class)); when(sf.getReader()).thenReturn(r); when(sf.createReader()).thenReturn(r); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java index 146882b..5973655 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java @@ -749,7 +749,8 @@ public class TestStripeCompactionPolicy { when(r.length()).thenReturn(size); when(r.getBloomFilterType()).thenReturn(BloomType.NONE); when(r.getHFileReader()).thenReturn(mock(HFile.Reader.class)); - when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong())).thenReturn( + when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyLong())) + .thenReturn( mock(StoreFileScanner.class)); when(sf.getReader()).thenReturn(r); when(sf.createReader(anyBoolean())).thenReturn(r); -- 2.3.2 (Apple Git-55)