From c96d5a9d21073fd6ee9dcffd0bd35e6a5a8e867b Mon Sep 17 00:00:00 2001 From: Apekshit Date: Tue, 23 Feb 2016 00:31:18 -0800 Subject: [PATCH] HBASE-15236 Inconsistent cell reads over multiple bulk-loaded HFiles. In KeyValueHeap, if two cells are same i.e. have same key and timestamp, then instead of directly using seq id to determine newer one, we should use StoreFile.Comparater.SEQ_ID because that's what is used to determine order of hfiles. In this patch, we assign each scanner an order based on it's index in storefiles list, which is then used in KeyValueHeap to disambiguate between same cells. Changes the getSequenceId() in KeyValueScanner class to getScannerOrder(). Testing: Adds unit test to TestKeyValueHeap. Manual testing: Three cases (Tables t, t2, t3 in the jira description), single region, 2 hfiles with same seq id, timestamps and duplicate KVs. Made sure that returned kv was same for get and scan. (Apekshit) Change-Id: I22600c91c0a51fb63eb17db73472839d2f13957c --- .../hadoop/hbase/regionserver/KeyValueHeap.java | 20 +- .../hadoop/hbase/regionserver/KeyValueScanner.java | 12 +- .../hadoop/hbase/regionserver/MemStoreScanner.java | 7 +- .../hadoop/hbase/regionserver/SegmentScanner.java | 35 +-- .../hadoop/hbase/regionserver/StoreFile.java | 6 +- .../hadoop/hbase/regionserver/StoreFileReader.java | 43 ++-- .../hbase/regionserver/StoreFileScanner.java | 38 ++- .../hadoop/hbase/regionserver/StoreScanner.java | 5 +- .../hadoop/hbase/util/CollectionBackedScanner.java | 5 +- .../hbase/regionserver/TestKeyValueHeap.java | 269 ++++++++------------- .../hadoop/hbase/regionserver/TestStoreFile.java | 2 +- .../regionserver/compactions/TestCompactor.java | 2 +- .../compactions/TestStripeCompactionPolicy.java | 3 +- 13 files changed, 212 insertions(+), 235 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java index 89fc8fb..9ece14b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java @@ -189,17 +189,10 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner if (comparison != 0) { return comparison; } else { - // Since both the keys are exactly the same, we break the tie in favor - // of the key which came latest. - long leftSequenceID = left.getSequenceID(); - long rightSequenceID = right.getSequenceID(); - if (leftSequenceID > rightSequenceID) { - return -1; - } else if (leftSequenceID < rightSequenceID) { - return 1; - } else { - return 0; - } + // Since both the keys are exactly the same, we break the tie in favor of higher ordered + // scanner since it'll have newer data. Since higher value should come first, we reverse + // sort here. + return Long.compare(right.getScannerOrder(), left.getScannerOrder()); } } /** @@ -406,8 +399,11 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner return this.heap; } + /** + * @see KeyValueScanner#getScannerOrder() + */ @Override - public long getSequenceID() { + public long getScannerOrder() { return 0; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java index ed86a83..44b081b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java @@ -70,13 +70,13 @@ public interface KeyValueScanner extends Shipper, Closeable { boolean reseek(Cell key) throws IOException; /** - * Get the sequence id associated with this KeyValueScanner. This is required - * for comparing multiple files to find out which one has the latest data. - * The default implementation for this would be to return 0. A file having - * lower sequence id will be considered to be the older one. + * Get the order of this KeyValueScanner. This is only relevant for StoreFileScanners and + * MemStoreScanners (other scanners simply return 0). This is required for comparing multiple + * files to find out which one has the latest data. StoreFileScanners are ordered from 0 + * (oldest) to newest in increasing order. MemStoreScanner gets LONG.max since it always + * contains freshest data. */ - // TODO: Implement SequenceId Interface instead. - long getSequenceID(); + long getScannerOrder(); /** * Close the KeyValue scanner. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java index dfcec25..01a7ff3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java @@ -186,11 +186,12 @@ public class MemStoreScanner extends NonLazyKeyValueScanner { } /** - * MemStoreScanner returns max value as sequence id because it will - * always have the latest data among all files. + * MemStoreScanner returns Long.MAX_VALUE because it will always have the latest data among all + * scanners. + * @see KeyValueScanner#getScannerOrder() */ @Override - public synchronized long getSequenceID() { + public long getScannerOrder() { return Long.MAX_VALUE; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java index b5aabb8..45f72d83 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java @@ -33,7 +33,12 @@ import org.apache.hadoop.hbase.client.Scan; @InterfaceAudience.Private public class SegmentScanner implements KeyValueScanner { - private long sequenceID = Long.MAX_VALUE; + /** + * Order of this scanner relative to other scanners. See + * {@link KeyValueScanner#getScannerOrder()}. + */ + private long scannerOrder; + private static final long DEFAULT_SCANNER_ORDER = Long.MAX_VALUE; // the observed structure private final Segment segment; @@ -52,6 +57,13 @@ public class SegmentScanner implements KeyValueScanner { private Cell last = null; protected SegmentScanner(Segment segment, long readPoint) { + this(segment, readPoint, DEFAULT_SCANNER_ORDER); + } + + /** + * @param scannerOrder see {@link KeyValueScanner#getScannerOrder()}. + */ + protected SegmentScanner(Segment segment, long readPoint, long scannerOrder) { this.segment = segment; this.readPoint = readPoint; iter = segment.iterator(); @@ -59,6 +71,7 @@ public class SegmentScanner implements KeyValueScanner { current = getNext(); //increase the reference count so the underlying structure will not be de-allocated this.segment.incScannerCount(); + this.scannerOrder = scannerOrder; } /** @@ -208,14 +221,11 @@ public class SegmentScanner implements KeyValueScanner { } /** - * Get the sequence id associated with this KeyValueScanner. This is required - * for comparing multiple files (or memstore segments) scanners to find out - * which one has the latest data. - * + * @see KeyValueScanner#getScannerOrder() */ @Override - public long getSequenceID() { - return sequenceID; + public long getScannerOrder() { + return scannerOrder; } /** @@ -297,15 +307,6 @@ public class SegmentScanner implements KeyValueScanner { } /** - * Set the sequence id of the scanner. - * This is used to determine an order between memory segment scanners. - * @param x a unique sequence id - */ - public void setSequenceID(long x) { - sequenceID = x; - } - - /** * Returns whether the given scan should seek in this segment * @return whether the given scan should seek in this segment */ @@ -321,7 +322,7 @@ public class SegmentScanner implements KeyValueScanner { @Override public String toString() { String res = "Store segment scanner of type "+this.getClass().getName()+"; "; - res += "sequence id "+getSequenceID()+"; "; + res += "Scanner order " + getScannerOrder() + "; "; res += getSegment().toString(); return res; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index d66d8bd..05d1e33 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -158,7 +158,8 @@ public class StoreFile { Bytes.toBytes("BULKLOAD_TIMESTAMP"); /** - * Map of the metadata entries in the corresponding HFile + * Map of the metadata entries in the corresponding HFile. Populated when Reader is opened + * after which it is not modified again. */ private Map metadataMap; @@ -238,6 +239,7 @@ public class StoreFile { this.fileInfo = other.fileInfo; this.cacheConf = other.cacheConf; this.cfBloomType = other.cfBloomType; + this.metadataMap = other.metadataMap; } /** @@ -371,7 +373,7 @@ public class StoreFile { if (startPos != -1) { bulkLoadedHFile = true; } - return bulkLoadedHFile || metadataMap.containsKey(BULKLOAD_TIME_KEY); + return bulkLoadedHFile || (metadataMap != null && metadataMap.containsKey(BULKLOAD_TIME_KEY)); } @VisibleForTesting diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java index a2ad5a4..79ea95b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java @@ -114,19 +114,27 @@ public class StoreFileReader { } /** - * Get a scanner to scan over this StoreFile. Do not use - * this overload if using this scanner for compactions. + * Uses {@link #getStoreFileScanner(boolean, boolean, boolean, long, long)} by setting + * {@code isCompaction} to false, {@code readPt} to 0 and {@code scannerOrder} to 0. + * Do not use this overload if using this scanner for compactions. * - * @param cacheBlocks should this scanner cache blocks? - * @param pread use pread (for highly concurrent small readers) - * @return a scanner + * @see #getStoreFileScanner(boolean, boolean, boolean, long, long) + */ + public StoreFileScanner getStoreFileScanner(boolean cacheBlocks, boolean pread) { + // 0 is passed as readpoint because this method is only used by test + // where StoreFile is directly operated upon + return getStoreFileScanner(cacheBlocks, pread, false, 0, 0); + } + + /** + * Uses {@link #getStoreFileScanner(boolean, boolean, boolean, long, long)} by setting + * {@code scannerOrder} to 0. + * + * @see #getStoreFileScanner(boolean, boolean, boolean, long, long) */ - public StoreFileScanner getStoreFileScanner(boolean cacheBlocks, - boolean pread) { - return getStoreFileScanner(cacheBlocks, pread, false, - // 0 is passed as readpoint because this method is only used by test - // where StoreFile is directly operated upon - 0); + public StoreFileScanner getStoreFileScanner( + boolean cacheBlocks, boolean pread, boolean isCompaction, long readPt) { + return getStoreFileScanner(cacheBlocks, pread, isCompaction, readPt, 0); } /** @@ -135,16 +143,17 @@ public class StoreFileReader { * @param cacheBlocks should this scanner cache blocks? * @param pread use pread (for highly concurrent small readers) * @param isCompaction is scanner being used for compaction? + * @param scannerOrder Order of this scanner relative to other scanners. See + * {@link KeyValueScanner#getScannerOrder()}. * @return a scanner */ - public StoreFileScanner getStoreFileScanner(boolean cacheBlocks, - boolean pread, - boolean isCompaction, long readPt) { + public StoreFileScanner getStoreFileScanner( + boolean cacheBlocks, boolean pread, boolean isCompaction, long readPt, long scannerOrder) { // Increment the ref count refCount.incrementAndGet(); - return new StoreFileScanner(this, - getScanner(cacheBlocks, pread, isCompaction), - !isCompaction, reader.hasMVCCInfo(), readPt); + return new StoreFileScanner( + this, getScanner(cacheBlocks, pread, isCompaction), !isCompaction, reader.hasMVCCInfo(), + readPt, scannerOrder); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java index e7f8f88..abade0e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java @@ -23,6 +23,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -65,17 +66,37 @@ public class StoreFileScanner implements KeyValueScanner { private long readPt; + // Order of this scanner relative to other scanners when duplicate key-value is found. + // Higher values means scanner has newer data. + private long scannerOrder; + /** * Implements a {@link KeyValueScanner} on top of the specified {@link HFileScanner} - * @param hfs HFile scanner + * @param useMVCC If true, scanner will filter out updates with MVCC larger than {@code readPt}. + * @param readPt MVCC value to use to filter out the updates newer than this scanner. + * @param hasMVCC Set to true if underlying store file reader has MVCC info. */ public StoreFileScanner(StoreFileReader reader, HFileScanner hfs, boolean useMVCC, boolean hasMVCC, long readPt) { + this (reader, hfs, useMVCC, hasMVCC, readPt, 0); + } + + /** + * Implements a {@link KeyValueScanner} on top of the specified {@link HFileScanner} + * @param useMVCC If true, scanner will filter out updates with MVCC larger than {@code readPt}. + * @param readPt MVCC value to use to filter out the updates newer than this scanner. + * @param hasMVCC Set to true if underlying store file reader has MVCC info. + * @param scannerOrder Order of the scanner relative to other scanners. + * See {@link KeyValueScanner#getScannerOrder()}. + */ + public StoreFileScanner(StoreFileReader reader, HFileScanner hfs, boolean useMVCC, + boolean hasMVCC, long readPt, long scannerOrder) { this.readPt = readPt; this.reader = reader; this.hfs = hfs; this.enforceMVCC = useMVCC; this.hasMVCCInfo = hasMVCC; + this.scannerOrder = scannerOrder; } boolean isPrimaryReplica() { @@ -115,11 +136,13 @@ public class StoreFileScanner implements KeyValueScanner { ScanQueryMatcher matcher, long readPt, boolean isPrimaryReplica) throws IOException { List scanners = new ArrayList( files.size()); - for (StoreFile file : files) { - StoreFileReader r = file.createReader(canUseDrop); + List sorted_files = new ArrayList<>(files); + Collections.sort(sorted_files, StoreFile.Comparators.SEQ_ID); + for (int i = 0; i < sorted_files.size(); i++) { + StoreFileReader r = sorted_files.get(i).createReader(); r.setReplicaStoreFile(isPrimaryReplica); StoreFileScanner scanner = r.getStoreFileScanner(cacheBlocks, usePread, - isCompaction, readPt); + isCompaction, readPt, i); scanner.setScanQueryMatcher(matcher); scanners.add(scanner); } @@ -303,9 +326,12 @@ public class StoreFileScanner implements KeyValueScanner { return s.next(); } + /** + * @see KeyValueScanner#getScannerOrder() + */ @Override - public long getSequenceID() { - return reader.getSequenceID(); + public long getScannerOrder() { + return scannerOrder; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index ecae787..7ebae94 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -887,8 +887,11 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner return false; } + /** + * @see KeyValueScanner#getScannerOrder() + */ @Override - public long getSequenceID() { + public long getScannerOrder() { return 0; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java index 4720880..3f05969 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java @@ -116,8 +116,11 @@ public class CollectionBackedScanner extends NonReversedNonLazyKeyValueScanner { return false; } + /** + * @see org.apache.hadoop.hbase.regionserver.KeyValueScanner#getScannerOrder() + */ @Override - public long getSequenceID() { + public long getScannerOrder() { return 0; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java index aff40c1..b030c74 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java @@ -21,7 +21,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.util.ArrayList; -import java.util.Iterator; +import java.util.Arrays; import java.util.List; import org.apache.hadoop.hbase.Cell; @@ -38,178 +38,105 @@ import org.junit.experimental.categories.Category; @Category({RegionServerTests.class, SmallTests.class}) public class TestKeyValueHeap extends HBaseTestCase { - private static final boolean PRINT = false; - - List scanners = new ArrayList(); - - private byte[] row1; - private byte[] fam1; - private byte[] col1; - private byte[] data; + private byte[] row1 = Bytes.toBytes("row1"); + private byte[] fam1 = Bytes.toBytes("fam1"); + private byte[] col1 = Bytes.toBytes("col1"); + private byte[] data = Bytes.toBytes("data"); + + private byte[] row2 = Bytes.toBytes("row2"); + private byte[] fam2 = Bytes.toBytes("fam2"); + private byte[] col2 = Bytes.toBytes("col2"); + + private byte[] col3 = Bytes.toBytes("col3"); + private byte[] col4 = Bytes.toBytes("col4"); + private byte[] col5 = Bytes.toBytes("col5"); + + // Variable name encoding. kv + Cell kv111 = new KeyValue(row1, fam1, col1, data); + Cell kv112 = new KeyValue(row1, fam1, col2, data); + Cell kv113 = new KeyValue(row1, fam1, col3, data); + Cell kv114 = new KeyValue(row1, fam1, col4, data); + Cell kv115 = new KeyValue(row1, fam1, col5, data); + Cell kv121 = new KeyValue(row1, fam2, col1, data); + Cell kv122 = new KeyValue(row1, fam2, col2, data); + Cell kv211 = new KeyValue(row2, fam1, col1, data); + Cell kv212 = new KeyValue(row2, fam1, col2, data); + Cell kv213 = new KeyValue(row2, fam1, col3, data); + + TestScanner s1 = new TestScanner(Arrays.asList(kv115, kv211, kv212)); + TestScanner s2 = new TestScanner(Arrays.asList(kv111, kv112)); + TestScanner s3 = new TestScanner(Arrays.asList(kv113, kv114, kv121, kv122, kv213)); + + List scanners = new ArrayList(Arrays.asList(s1, s2, s3)); + + /* + * Uses {@code scanners} to build a KeyValueHeap, iterates over it and asserts that returned + * Cells are same as {@code expected}. + * @return List of Cells returned from scanners. + */ + public List assertCells(List expected, List scanners) + throws IOException { + //Creating KeyValueHeap + KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparator.COMPARATOR); - private byte[] row2; - private byte[] fam2; - private byte[] col2; + List actual = new ArrayList<>(); + while(kvh.peek() != null){ + actual.add(kvh.next()); + } - private byte[] col3; - private byte[] col4; - private byte[] col5; + assertEquals(expected, actual); + return actual; + } @Before public void setUp() throws Exception { super.setUp(); - data = Bytes.toBytes("data"); - row1 = Bytes.toBytes("row1"); - fam1 = Bytes.toBytes("fam1"); - col1 = Bytes.toBytes("col1"); - row2 = Bytes.toBytes("row2"); - fam2 = Bytes.toBytes("fam2"); - col2 = Bytes.toBytes("col2"); - col3 = Bytes.toBytes("col3"); - col4 = Bytes.toBytes("col4"); - col5 = Bytes.toBytes("col5"); } @Test public void testSorted() throws IOException{ //Cases that need to be checked are: - //1. The "smallest" KeyValue is in the same scanners as current + //1. The "smallest" Cell is in the same scanners as current //2. Current scanner gets empty - List l1 = new ArrayList(); - l1.add(new KeyValue(row1, fam1, col5, data)); - l1.add(new KeyValue(row2, fam1, col1, data)); - l1.add(new KeyValue(row2, fam1, col2, data)); - scanners.add(new Scanner(l1)); - - List l2 = new ArrayList(); - l2.add(new KeyValue(row1, fam1, col1, data)); - l2.add(new KeyValue(row1, fam1, col2, data)); - scanners.add(new Scanner(l2)); - - List l3 = new ArrayList(); - l3.add(new KeyValue(row1, fam1, col3, data)); - l3.add(new KeyValue(row1, fam1, col4, data)); - l3.add(new KeyValue(row1, fam2, col1, data)); - l3.add(new KeyValue(row1, fam2, col2, data)); - l3.add(new KeyValue(row2, fam1, col3, data)); - scanners.add(new Scanner(l3)); - - List expected = new ArrayList(); - expected.add(new KeyValue(row1, fam1, col1, data)); - expected.add(new KeyValue(row1, fam1, col2, data)); - expected.add(new KeyValue(row1, fam1, col3, data)); - expected.add(new KeyValue(row1, fam1, col4, data)); - expected.add(new KeyValue(row1, fam1, col5, data)); - expected.add(new KeyValue(row1, fam2, col1, data)); - expected.add(new KeyValue(row1, fam2, col2, data)); - expected.add(new KeyValue(row2, fam1, col1, data)); - expected.add(new KeyValue(row2, fam1, col2, data)); - expected.add(new KeyValue(row2, fam1, col3, data)); + List expected = Arrays.asList( + kv111, kv112, kv113, kv114, kv115, kv121, kv122, kv211, kv212, kv213); - //Creating KeyValueHeap - KeyValueHeap kvh = - new KeyValueHeap(scanners, CellComparator.COMPARATOR); - - List actual = new ArrayList(); - while(kvh.peek() != null){ - actual.add(kvh.next()); - } - - assertEquals(expected.size(), actual.size()); - for(int i=0; i actual = assertCells(expected, scanners); //Check if result is sorted according to Comparator for(int i=0; i l1 = new ArrayList(); - l1.add(new KeyValue(row1, fam1, col5, data)); - l1.add(new KeyValue(row2, fam1, col1, data)); - l1.add(new KeyValue(row2, fam1, col2, data)); - scanners.add(new Scanner(l1)); - - List l2 = new ArrayList(); - l2.add(new KeyValue(row1, fam1, col1, data)); - l2.add(new KeyValue(row1, fam1, col2, data)); - scanners.add(new Scanner(l2)); - - List l3 = new ArrayList(); - l3.add(new KeyValue(row1, fam1, col3, data)); - l3.add(new KeyValue(row1, fam1, col4, data)); - l3.add(new KeyValue(row1, fam2, col1, data)); - l3.add(new KeyValue(row1, fam2, col2, data)); - l3.add(new KeyValue(row2, fam1, col3, data)); - scanners.add(new Scanner(l3)); - - List expected = new ArrayList(); - expected.add(new KeyValue(row2, fam1, col1, data)); + List expected = Arrays.asList(kv211); //Creating KeyValueHeap KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparator.COMPARATOR); - KeyValue seekKv = new KeyValue(row2, fam1, null, null); + Cell seekKv = new KeyValue(row2, fam1, null, null); kvh.seek(seekKv); - List actual = new ArrayList(); - actual.add(kvh.peek()); - - assertEquals(expected.size(), actual.size()); - for(int i=0; i actual = Arrays.asList(kvh.peek()); + assertEquals("Expected = " + Arrays.toString(expected.toArray()) + + "\n Actual = " + Arrays.toString(actual.toArray()), expected, actual); } @Test public void testScannerLeak() throws IOException { // Test for unclosed scanners (HBASE-1927) - List l1 = new ArrayList(); - l1.add(new KeyValue(row1, fam1, col5, data)); - l1.add(new KeyValue(row2, fam1, col1, data)); - l1.add(new KeyValue(row2, fam1, col2, data)); - Scanner s1 = new Scanner(l1); - scanners.add(s1); - - List l2 = new ArrayList(); - l2.add(new KeyValue(row1, fam1, col1, data)); - l2.add(new KeyValue(row1, fam1, col2, data)); - Scanner s2 = new Scanner(l2); - scanners.add(s2); - - List l3 = new ArrayList(); - l3.add(new KeyValue(row1, fam1, col3, data)); - l3.add(new KeyValue(row1, fam1, col4, data)); - l3.add(new KeyValue(row1, fam2, col1, data)); - l3.add(new KeyValue(row1, fam2, col2, data)); - l3.add(new KeyValue(row2, fam1, col3, data)); - Scanner s3 = new Scanner(l3); - scanners.add(s3); - - List l4 = new ArrayList(); - Scanner s4 = new Scanner(l4); + TestScanner s4 = new TestScanner(new ArrayList()); scanners.add(s4); //Creating KeyValueHeap @@ -225,7 +152,7 @@ public class TestKeyValueHeap extends HBaseTestCase { assertTrue(kvh.scannersForDelayedClose.contains(s4)); kvh.close(); for(KeyValueScanner scanner : scanners) { - assertTrue(((Scanner)scanner).isClosed()); + assertTrue(((TestScanner)scanner).isClosed()); } } @@ -233,38 +160,19 @@ public class TestKeyValueHeap extends HBaseTestCase { public void testScannerException() throws IOException { // Test for NPE issue when exception happens in scanners (HBASE-13835) - List l1 = new ArrayList(); - l1.add(new KeyValue(row1, fam1, col5, data)); - l1.add(new KeyValue(row2, fam1, col1, data)); - l1.add(new KeyValue(row2, fam1, col2, data)); - SeekScanner s1 = new SeekScanner(l1); - scanners.add(s1); - - List l2 = new ArrayList(); - l2.add(new KeyValue(row1, fam1, col1, data)); - l2.add(new KeyValue(row1, fam1, col2, data)); - SeekScanner s2 = new SeekScanner(l2); - scanners.add(s2); - - List l3 = new ArrayList(); - l3.add(new KeyValue(row1, fam1, col3, data)); - l3.add(new KeyValue(row1, fam1, col4, data)); - l3.add(new KeyValue(row1, fam2, col1, data)); - l3.add(new KeyValue(row1, fam2, col2, data)); - l3.add(new KeyValue(row2, fam1, col3, data)); - SeekScanner s3 = new SeekScanner(l3); - scanners.add(s3); - - List l4 = new ArrayList(); - SeekScanner s4 = new SeekScanner(l4); - scanners.add(s4); + TestScanner s1 = new SeekTestScanner(Arrays.asList(kv115, kv211, kv212)); + TestScanner s2 = new SeekTestScanner(Arrays.asList(kv111, kv112)); + TestScanner s3 = new SeekTestScanner(Arrays.asList(kv113, kv114, kv121, kv122, kv213)); + TestScanner s4 = new SeekTestScanner(new ArrayList()); + + List scanners = new ArrayList(Arrays.asList(s1, s2, s3, s4)); // Creating KeyValueHeap KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparator.COMPARATOR); try { for (KeyValueScanner scanner : scanners) { - ((SeekScanner) scanner).setRealSeekDone(false); + ((SeekTestScanner) scanner).setRealSeekDone(false); } while (kvh.next() != null); // The pollRealKV should throw IOE. @@ -276,20 +184,47 @@ public class TestKeyValueHeap extends HBaseTestCase { // It implies there is no NPE thrown from kvh.close() if getting here for (KeyValueScanner scanner : scanners) { // Verify that close is called and only called once for each scanner - assertTrue(((SeekScanner) scanner).isClosed()); - assertEquals(((SeekScanner) scanner).getClosedNum(), 1); + assertTrue(((SeekTestScanner) scanner).isClosed()); + assertEquals(((SeekTestScanner) scanner).getClosedNum(), 1); + } + } + + @Test + public void testPriorityId() throws IOException { + Cell kv113A = new KeyValue(row1, fam1, col3, Bytes.toBytes("aaa")); + Cell kv113B = new KeyValue(row1, fam1, col3, Bytes.toBytes("bbb")); + { + TestScanner scan1 = new TestScanner(Arrays.asList(kv111, kv112, kv113A), 1); + TestScanner scan2 = new TestScanner(Arrays.asList(kv113B), 2); + List expected = Arrays.asList(kv111, kv112, kv113B, kv113A); + assertCells(expected, new ArrayList(Arrays.asList(scan1, scan2))); + } + { + TestScanner scan1 = new TestScanner(Arrays.asList(kv111, kv112, kv113A), 2); + TestScanner scan2 = new TestScanner(Arrays.asList(kv113B), 1); + List expected = Arrays.asList(kv111, kv112, kv113A, kv113B); + assertCells(expected, new ArrayList(Arrays.asList(scan1, scan2))); } } - private static class Scanner extends CollectionBackedScanner { - private Iterator iter; - private Cell current; + private static class TestScanner extends CollectionBackedScanner { private boolean closed = false; + private long scannerOrder = 0; - public Scanner(List list) { + public TestScanner(List list) { super(list); } + public TestScanner(List list, long scannerOrder) { + this(list); + this.scannerOrder = scannerOrder; + } + + @Override + public long getScannerOrder() { + return scannerOrder; + } + @Override public void close(){ closed = true; @@ -300,11 +235,11 @@ public class TestKeyValueHeap extends HBaseTestCase { } } - private static class SeekScanner extends Scanner { + private static class SeekTestScanner extends TestScanner { private int closedNum = 0; private boolean realSeekDone = true; - public SeekScanner(List list) { + public SeekTestScanner(List list) { super(list); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java index d8acd44..ab0c173 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java @@ -211,7 +211,7 @@ public class TestStoreFile extends HBaseTestCase { when(hcd.getName()).thenReturn(cf); when(store.getFamily()).thenReturn(hcd); StoreFileScanner scanner = - new StoreFileScanner(reader, mock(HFileScanner.class), false, false, 0); + new StoreFileScanner(reader, mock(HFileScanner.class), false, false, 0, 0); Scan scan = new Scan(); scan.setColumnFamilyTimeRange(cf, 0, 1); assertFalse(scanner.shouldUseScanner(scan, store, 0)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java index 7707116..670a8d3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java @@ -64,7 +64,7 @@ public class TestCompactor { when(r.length()).thenReturn(1L); when(r.getBloomFilterType()).thenReturn(BloomType.NONE); when(r.getHFileReader()).thenReturn(mock(HFile.Reader.class)); - when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong())) + when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyLong())) .thenReturn(mock(StoreFileScanner.class)); when(sf.getReader()).thenReturn(r); when(sf.createReader()).thenReturn(r); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java index 160deb3..d8770e0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java @@ -750,7 +750,8 @@ public class TestStripeCompactionPolicy { when(r.length()).thenReturn(size); when(r.getBloomFilterType()).thenReturn(BloomType.NONE); when(r.getHFileReader()).thenReturn(mock(HFile.Reader.class)); - when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong())).thenReturn( + when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyLong())) + .thenReturn( mock(StoreFileScanner.class)); when(sf.getReader()).thenReturn(r); when(sf.createReader(anyBoolean())).thenReturn(r); -- 2.3.2 (Apple Git-55)