From bef19a37a168f4771afe16d6e296b614a5a4c388 Mon Sep 17 00:00:00 2001 From: Apekshit Date: Tue, 23 Feb 2016 00:31:18 -0800 Subject: [PATCH] HBASE-15236 Inconsistent cell reads over multiple bulk-loaded HFiles. In KeyValueHeap, instead of using just seq id to determine priority order in case two Cells are same, we should order determined by StoreFile.Comparater.SEQ_ID. Also changes the name in KeyValueScanner from getSequenceId() to getPriorityId() because that is what it means now (and always meant in SegmentScanner). Testing: Adds unit test to TestKeyValueHeap. Manual testing: Three cases (Tables t, t2, t3 in the jira description), single region, 2 hfiles with same seq id, timestamps and duplicate KVs. Made sure that returned kv was same for get and scan. (Apekshit) --- .../regionserver/DefaultStoreFileManager.java | 1 + .../hadoop/hbase/regionserver/KeyValueHeap.java | 14 +- .../hadoop/hbase/regionserver/KeyValueScanner.java | 8 +- .../hadoop/hbase/regionserver/MemStoreScanner.java | 2 +- .../hadoop/hbase/regionserver/SegmentScanner.java | 16 +- .../hadoop/hbase/regionserver/StoreFile.java | 46 ++-- .../hbase/regionserver/StoreFileScanner.java | 24 +- .../hadoop/hbase/regionserver/StoreScanner.java | 2 +- .../hadoop/hbase/util/CollectionBackedScanner.java | 2 +- .../hbase/regionserver/TestKeyValueHeap.java | 252 ++++++++------------- .../hadoop/hbase/regionserver/TestStoreFile.java | 2 +- .../hbase/regionserver/TestStripeCompactor.java | 2 +- .../compactions/TestStripeCompactionPolicy.java | 3 +- 13 files changed, 162 insertions(+), 212 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java index d38306c..7f1e6e0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java @@ -209,6 +209,7 @@ class DefaultStoreFileManager implements StoreFileManager { return expiredStoreFiles; } + // Order of sorted files: older to newer. private void sortAndSetStoreFiles(List storeFiles) { Collections.sort(storeFiles, StoreFile.Comparators.SEQ_ID); storefiles = ImmutableList.copyOf(storeFiles); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java index 89fc8fb..8a8517d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java @@ -190,16 +190,8 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner return comparison; } else { // Since both the keys are exactly the same, we break the tie in favor - // of the key which came latest. - long leftSequenceID = left.getSequenceID(); - long rightSequenceID = right.getSequenceID(); - if (leftSequenceID > rightSequenceID) { - return -1; - } else if (leftSequenceID < rightSequenceID) { - return 1; - } else { - return 0; - } + // of higher priority. + return Long.compare(right.getPriorityId(), left.getPriorityId()); } } /** @@ -407,7 +399,7 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner } @Override - public long getSequenceID() { + public long getPriorityId() { return 0; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java index eae713f..af7f0c7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java @@ -66,12 +66,12 @@ public interface KeyValueScanner extends Shipper { boolean reseek(Cell key) throws IOException; /** - * Get the sequence id associated with this KeyValueScanner. This is required + * Get the priority associated with this KeyValueScanner. This is required * for comparing multiple files to find out which one has the latest data. - * The default implementation for this would be to return 0. A file having - * lower sequence id will be considered to be the older one. + * The default implementation for this would be to return 0. KeyValue from file with + * higher priority (higher value) will be used first. */ - long getSequenceID(); + long getPriorityId(); /** * Close the KeyValue scanner. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java index dfcec25..e57ef73 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java @@ -190,7 +190,7 @@ public class MemStoreScanner extends NonLazyKeyValueScanner { * always have the latest data among all files. */ @Override - public synchronized long getSequenceID() { + public synchronized long getPriorityId() { return Long.MAX_VALUE; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java index 8852d5c..2252a30 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java @@ -30,7 +30,7 @@ import org.apache.hadoop.hbase.client.Scan; @InterfaceAudience.Private public abstract class SegmentScanner implements KeyValueScanner { - private long sequenceID = Long.MAX_VALUE; + private long priorityId = Long.MAX_VALUE; protected abstract Segment getSegment(); @@ -41,8 +41,8 @@ public abstract class SegmentScanner implements KeyValueScanner { * */ @Override - public long getSequenceID() { - return sequenceID; + public long getPriorityId() { + return priorityId; } /** @@ -124,12 +124,12 @@ public abstract class SegmentScanner implements KeyValueScanner { } /** - * Set the sequence id of the scanner. + * Set the priority id of the scanner. Higher value has higher priority. * This is used to determine an order between memory segment scanners. - * @param x a unique sequence id + * @param x a unique priority id */ - public void setSequenceID(long x) { - sequenceID = x; + public void setPriorityId(long x) { + priorityId = x; } /** @@ -144,7 +144,7 @@ public abstract class SegmentScanner implements KeyValueScanner { @Override public String toString() { String res = "Store segment scanner of type "+this.getClass().getName()+"; "; - res += "sequence id "+getSequenceID()+"; "; + res += "priority id " + getPriorityId() + "; "; res += getSegment().toString(); return res; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index 4ced556..76262c1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -258,6 +258,7 @@ public class StoreFile { this.fileInfo = other.fileInfo; this.cacheConf = other.cacheConf; this.cfBloomType = other.cfBloomType; + this.metadataMap = other.metadataMap; } /** @@ -384,7 +385,7 @@ public class StoreFile { if (startPos != -1) { bulkLoadedHFile = true; } - return bulkLoadedHFile || metadataMap.containsKey(BULKLOAD_TIME_KEY); + return bulkLoadedHFile || (metadataMap != null && metadataMap.containsKey(BULKLOAD_TIME_KEY)); } @VisibleForTesting @@ -1208,19 +1209,27 @@ public class StoreFile { } /** - * Get a scanner to scan over this StoreFile. Do not use - * this overload if using this scanner for compactions. + * Uses {@link #getStoreFileScanner(boolean, boolean, boolean, long, long)} by setting + * {@code isCompaction} to false, {@code readPt} to 0 and {@code priority} to 0. + * Do not use this overload if using this scanner for compactions. * - * @param cacheBlocks should this scanner cache blocks? - * @param pread use pread (for highly concurrent small readers) - * @return a scanner + * @see #getStoreFileScanner(boolean, boolean, boolean, long, long) + */ + public StoreFileScanner getStoreFileScanner(boolean cacheBlocks, boolean pread) { + // 0 is passed as readpoint because this method is only used by test + // where StoreFile is directly operated upon + return getStoreFileScanner(cacheBlocks, pread, false, 0, 0); + } + + /** + * Uses {@link #getStoreFileScanner(boolean, boolean, boolean, long, long)} by setting + * {@code priority} to 0. + * + * @see #getStoreFileScanner(boolean, boolean, boolean, long, long) */ - public StoreFileScanner getStoreFileScanner(boolean cacheBlocks, - boolean pread) { - return getStoreFileScanner(cacheBlocks, pread, false, - // 0 is passed as readpoint because this method is only used by test - // where StoreFile is directly operated upon - 0); + public StoreFileScanner getStoreFileScanner( + boolean cacheBlocks, boolean pread, boolean isCompaction, long readPt) { + return getStoreFileScanner(cacheBlocks, pread, isCompaction, readPt, 0); } /** @@ -1229,16 +1238,17 @@ public class StoreFile { * @param cacheBlocks should this scanner cache blocks? * @param pread use pread (for highly concurrent small readers) * @param isCompaction is scanner being used for compaction? + * @param priority Priority of this scanner relative to other scanners, when same keyvalue is + * present in both. * @return a scanner */ - public StoreFileScanner getStoreFileScanner(boolean cacheBlocks, - boolean pread, - boolean isCompaction, long readPt) { + public StoreFileScanner getStoreFileScanner( + boolean cacheBlocks, boolean pread, boolean isCompaction, long readPt, long priority) { // Increment the ref count refCount.incrementAndGet(); - return new StoreFileScanner(this, - getScanner(cacheBlocks, pread, isCompaction), - !isCompaction, reader.hasMVCCInfo(), readPt); + return new StoreFileScanner( + this, getScanner(cacheBlocks, pread, isCompaction), !isCompaction, reader.hasMVCCInfo(), + readPt, priority); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java index 924e7f6..4abf7f9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java @@ -23,8 +23,10 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; +import org.apache.hadoop.hbase.SettableSequenceId; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; @@ -66,17 +68,21 @@ public class StoreFileScanner implements KeyValueScanner { private long readPt; + // Priority of this scanner relative to other scanners when duplicate key-value is found. + private long priority; + /** * Implements a {@link KeyValueScanner} on top of the specified {@link HFileScanner} * @param hfs HFile scanner */ public StoreFileScanner(StoreFile.Reader reader, HFileScanner hfs, boolean useMVCC, - boolean hasMVCC, long readPt) { + boolean hasMVCC, long readPt, long priority) { this.readPt = readPt; this.reader = reader; this.hfs = hfs; this.enforceMVCC = useMVCC; this.hasMVCCInfo = hasMVCC; + this.priority = priority; } boolean isPrimaryReplica() { @@ -116,11 +122,14 @@ public class StoreFileScanner implements KeyValueScanner { ScanQueryMatcher matcher, long readPt, boolean isPrimaryReplica) throws IOException { List scanners = new ArrayList( files.size()); - for (StoreFile file : files) { - StoreFile.Reader r = file.createReader(canUseDrop); + + List sorted_files = new ArrayList<>(files); + Collections.sort(sorted_files, StoreFile.Comparators.SEQ_ID); + for (int i = 0; i < sorted_files.size(); i++) { + Reader r = sorted_files.get(i).createReader(); r.setReplicaStoreFile(isPrimaryReplica); StoreFileScanner scanner = r.getStoreFileScanner(cacheBlocks, usePread, - isCompaction, readPt); + isCompaction, readPt, i); scanner.setScanQueryMatcher(matcher); scanners.add(scanner); } @@ -219,7 +228,8 @@ public class StoreFileScanner implements KeyValueScanner { protected void setCurrentCell(Cell newVal) throws IOException { this.cur = newVal; - if (this.cur != null && this.reader.isBulkLoaded() && !this.reader.isSkipResetSeqId()) { + if (this.cur != null && this.reader.isBulkLoaded() && !this.reader.isSkipResetSeqId() + && this.cur instanceof SettableSequenceId) { CellUtil.setSequenceId(cur, this.reader.getSequenceID()); } } @@ -305,8 +315,8 @@ public class StoreFileScanner implements KeyValueScanner { } @Override - public long getSequenceID() { - return reader.getSequenceID(); + public long getPriorityId() { + return priority; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 2f0d284..282f2f2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -827,7 +827,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner } @Override - public long getSequenceID() { + public long getPriorityId() { return 0; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java index 9fc068f..d0a6df6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java @@ -118,7 +118,7 @@ public class CollectionBackedScanner extends NonReversedNonLazyKeyValueScanner { } @Override - public long getSequenceID() { + public long getPriorityId() { return 0; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java index aff40c1..29fa39d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Iterator; import java.util.List; @@ -38,178 +39,105 @@ import org.junit.experimental.categories.Category; @Category({RegionServerTests.class, SmallTests.class}) public class TestKeyValueHeap extends HBaseTestCase { - private static final boolean PRINT = false; - - List scanners = new ArrayList(); - - private byte[] row1; - private byte[] fam1; - private byte[] col1; - private byte[] data; + private byte[] row1 = Bytes.toBytes("row1"); + private byte[] fam1 = Bytes.toBytes("fam1"); + private byte[] col1 = Bytes.toBytes("col1"); + private byte[] data = Bytes.toBytes("data"); + + private byte[] row2 = Bytes.toBytes("row2"); + private byte[] fam2 = Bytes.toBytes("fam2"); + private byte[] col2 = Bytes.toBytes("col2"); + + private byte[] col3 = Bytes.toBytes("col3"); + private byte[] col4 = Bytes.toBytes("col4"); + private byte[] col5 = Bytes.toBytes("col5"); + + // Variable name encoding. kv + Cell kv111 = new KeyValue(row1, fam1, col1, data); + Cell kv112 = new KeyValue(row1, fam1, col2, data); + Cell kv113 = new KeyValue(row1, fam1, col3, data); + Cell kv114 = new KeyValue(row1, fam1, col4, data); + Cell kv115 = new KeyValue(row1, fam1, col5, data); + Cell kv121 = new KeyValue(row1, fam2, col1, data); + Cell kv122 = new KeyValue(row1, fam2, col2, data); + Cell kv211 = new KeyValue(row2, fam1, col1, data); + Cell kv212 = new KeyValue(row2, fam1, col2, data); + Cell kv213 = new KeyValue(row2, fam1, col3, data); + + Scanner s1 = new Scanner(Arrays.asList(kv115, kv211, kv212)); + Scanner s2 = new Scanner(Arrays.asList(kv111, kv112)); + Scanner s3 = new Scanner(Arrays.asList(kv113, kv114, kv121, kv122, kv213)); + + List scanners = new ArrayList(Arrays.asList(s1, s2, s3)); + + /* + * Uses {@code scanners} to build a KeyValueHeap, iterates over it and asserts that returned + * Cells are same as {@code expected}. + * @return List of Cells returned from scanners. + */ + public List assertCells(List expected, List scanners) + throws IOException { + //Creating KeyValueHeap + KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparator.COMPARATOR); - private byte[] row2; - private byte[] fam2; - private byte[] col2; + List actual = new ArrayList<>(); + while(kvh.peek() != null){ + actual.add(kvh.next()); + } - private byte[] col3; - private byte[] col4; - private byte[] col5; + assertEquals(expected, actual); + return actual; + } @Before public void setUp() throws Exception { super.setUp(); - data = Bytes.toBytes("data"); - row1 = Bytes.toBytes("row1"); - fam1 = Bytes.toBytes("fam1"); - col1 = Bytes.toBytes("col1"); - row2 = Bytes.toBytes("row2"); - fam2 = Bytes.toBytes("fam2"); - col2 = Bytes.toBytes("col2"); - col3 = Bytes.toBytes("col3"); - col4 = Bytes.toBytes("col4"); - col5 = Bytes.toBytes("col5"); } @Test public void testSorted() throws IOException{ //Cases that need to be checked are: - //1. The "smallest" KeyValue is in the same scanners as current + //1. The "smallest" Cell is in the same scanners as current //2. Current scanner gets empty - List l1 = new ArrayList(); - l1.add(new KeyValue(row1, fam1, col5, data)); - l1.add(new KeyValue(row2, fam1, col1, data)); - l1.add(new KeyValue(row2, fam1, col2, data)); - scanners.add(new Scanner(l1)); - - List l2 = new ArrayList(); - l2.add(new KeyValue(row1, fam1, col1, data)); - l2.add(new KeyValue(row1, fam1, col2, data)); - scanners.add(new Scanner(l2)); - - List l3 = new ArrayList(); - l3.add(new KeyValue(row1, fam1, col3, data)); - l3.add(new KeyValue(row1, fam1, col4, data)); - l3.add(new KeyValue(row1, fam2, col1, data)); - l3.add(new KeyValue(row1, fam2, col2, data)); - l3.add(new KeyValue(row2, fam1, col3, data)); - scanners.add(new Scanner(l3)); - - List expected = new ArrayList(); - expected.add(new KeyValue(row1, fam1, col1, data)); - expected.add(new KeyValue(row1, fam1, col2, data)); - expected.add(new KeyValue(row1, fam1, col3, data)); - expected.add(new KeyValue(row1, fam1, col4, data)); - expected.add(new KeyValue(row1, fam1, col5, data)); - expected.add(new KeyValue(row1, fam2, col1, data)); - expected.add(new KeyValue(row1, fam2, col2, data)); - expected.add(new KeyValue(row2, fam1, col1, data)); - expected.add(new KeyValue(row2, fam1, col2, data)); - expected.add(new KeyValue(row2, fam1, col3, data)); + List expected = Arrays.asList( + kv111, kv112, kv113, kv114, kv115, kv121, kv122, kv211, kv212, kv213); - //Creating KeyValueHeap - KeyValueHeap kvh = - new KeyValueHeap(scanners, CellComparator.COMPARATOR); - - List actual = new ArrayList(); - while(kvh.peek() != null){ - actual.add(kvh.next()); - } - - assertEquals(expected.size(), actual.size()); - for(int i=0; i actual = assertCells(expected, scanners); //Check if result is sorted according to Comparator for(int i=0; i l1 = new ArrayList(); - l1.add(new KeyValue(row1, fam1, col5, data)); - l1.add(new KeyValue(row2, fam1, col1, data)); - l1.add(new KeyValue(row2, fam1, col2, data)); - scanners.add(new Scanner(l1)); - - List l2 = new ArrayList(); - l2.add(new KeyValue(row1, fam1, col1, data)); - l2.add(new KeyValue(row1, fam1, col2, data)); - scanners.add(new Scanner(l2)); - - List l3 = new ArrayList(); - l3.add(new KeyValue(row1, fam1, col3, data)); - l3.add(new KeyValue(row1, fam1, col4, data)); - l3.add(new KeyValue(row1, fam2, col1, data)); - l3.add(new KeyValue(row1, fam2, col2, data)); - l3.add(new KeyValue(row2, fam1, col3, data)); - scanners.add(new Scanner(l3)); - - List expected = new ArrayList(); - expected.add(new KeyValue(row2, fam1, col1, data)); + List expected = Arrays.asList(kv211); //Creating KeyValueHeap KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparator.COMPARATOR); - KeyValue seekKv = new KeyValue(row2, fam1, null, null); + Cell seekKv = new KeyValue(row2, fam1, null, null); kvh.seek(seekKv); - List actual = new ArrayList(); - actual.add(kvh.peek()); - - assertEquals(expected.size(), actual.size()); - for(int i=0; i actual = Arrays.asList(kvh.peek()); + assertEquals("Expected = " + Arrays.toString(expected.toArray()) + + "\n Actual = " + Arrays.toString(actual.toArray()), expected, actual); } @Test public void testScannerLeak() throws IOException { // Test for unclosed scanners (HBASE-1927) - List l1 = new ArrayList(); - l1.add(new KeyValue(row1, fam1, col5, data)); - l1.add(new KeyValue(row2, fam1, col1, data)); - l1.add(new KeyValue(row2, fam1, col2, data)); - Scanner s1 = new Scanner(l1); - scanners.add(s1); - - List l2 = new ArrayList(); - l2.add(new KeyValue(row1, fam1, col1, data)); - l2.add(new KeyValue(row1, fam1, col2, data)); - Scanner s2 = new Scanner(l2); - scanners.add(s2); - - List l3 = new ArrayList(); - l3.add(new KeyValue(row1, fam1, col3, data)); - l3.add(new KeyValue(row1, fam1, col4, data)); - l3.add(new KeyValue(row1, fam2, col1, data)); - l3.add(new KeyValue(row1, fam2, col2, data)); - l3.add(new KeyValue(row2, fam1, col3, data)); - Scanner s3 = new Scanner(l3); - scanners.add(s3); - - List l4 = new ArrayList(); - Scanner s4 = new Scanner(l4); + Scanner s4 = new Scanner(new ArrayList()); scanners.add(s4); //Creating KeyValueHeap @@ -233,31 +161,12 @@ public class TestKeyValueHeap extends HBaseTestCase { public void testScannerException() throws IOException { // Test for NPE issue when exception happens in scanners (HBASE-13835) - List l1 = new ArrayList(); - l1.add(new KeyValue(row1, fam1, col5, data)); - l1.add(new KeyValue(row2, fam1, col1, data)); - l1.add(new KeyValue(row2, fam1, col2, data)); - SeekScanner s1 = new SeekScanner(l1); - scanners.add(s1); - - List l2 = new ArrayList(); - l2.add(new KeyValue(row1, fam1, col1, data)); - l2.add(new KeyValue(row1, fam1, col2, data)); - SeekScanner s2 = new SeekScanner(l2); - scanners.add(s2); - - List l3 = new ArrayList(); - l3.add(new KeyValue(row1, fam1, col3, data)); - l3.add(new KeyValue(row1, fam1, col4, data)); - l3.add(new KeyValue(row1, fam2, col1, data)); - l3.add(new KeyValue(row1, fam2, col2, data)); - l3.add(new KeyValue(row2, fam1, col3, data)); - SeekScanner s3 = new SeekScanner(l3); - scanners.add(s3); - - List l4 = new ArrayList(); - SeekScanner s4 = new SeekScanner(l4); - scanners.add(s4); + Scanner s1 = new SeekScanner(Arrays.asList(kv115, kv211, kv212)); + Scanner s2 = new SeekScanner(Arrays.asList(kv111, kv112)); + Scanner s3 = new SeekScanner(Arrays.asList(kv113, kv114, kv121, kv122, kv213)); + Scanner s4 = new SeekScanner(new ArrayList()); + + List scanners = new ArrayList(Arrays.asList(s1, s2, s3, s4)); // Creating KeyValueHeap KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparator.COMPARATOR); @@ -281,15 +190,42 @@ public class TestKeyValueHeap extends HBaseTestCase { } } + @Test + public void testPriorityId() throws IOException { + Cell kv113A = new KeyValue(row1, fam1, col3, Bytes.toBytes("aaa")); + Cell kv113B = new KeyValue(row1, fam1, col3, Bytes.toBytes("bbb")); + { + Scanner scan1 = new Scanner(Arrays.asList(kv111, kv112, kv113A), 1); + Scanner scan2 = new Scanner(Arrays.asList(kv113B), 2); + List expected = Arrays.asList(kv111, kv112, kv113B, kv113A); + assertCells(expected, new ArrayList(Arrays.asList(scan1, scan2))); + } + { + Scanner scan1 = new Scanner(Arrays.asList(kv111, kv112, kv113A), 2); + Scanner scan2 = new Scanner(Arrays.asList(kv113B), 1); + List expected = Arrays.asList(kv111, kv112, kv113A, kv113B); + assertCells(expected, new ArrayList(Arrays.asList(scan1, scan2))); + } + } + private static class Scanner extends CollectionBackedScanner { - private Iterator iter; - private Cell current; private boolean closed = false; + private long priority = 0; public Scanner(List list) { super(list); } + public Scanner(List list, long priority) { + this(list); + this.priority = priority; + } + + @Override + public long getPriorityId() { + return priority; + } + @Override public void close(){ closed = true; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java index e984c5d..8d24e53 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java @@ -212,7 +212,7 @@ public class TestStoreFile extends HBaseTestCase { when(hcd.getName()).thenReturn(cf); when(store.getFamily()).thenReturn(hcd); StoreFileScanner scanner = - new StoreFileScanner(reader, mock(HFileScanner.class), false, false, 0); + new StoreFileScanner(reader, mock(HFileScanner.class), false, false, 0, 0); Scan scan = new Scan(); scan.setColumnFamilyTimeRange(cf, 0, 1); assertFalse(scanner.shouldUseScanner(scan, store, 0)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java index cb586f3..fd64dc3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java @@ -224,7 +224,7 @@ public class TestStripeCompactor { when(r.length()).thenReturn(1L); when(r.getBloomFilterType()).thenReturn(BloomType.NONE); when(r.getHFileReader()).thenReturn(mock(HFile.Reader.class)); - when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong())) + when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyLong())) .thenReturn(mock(StoreFileScanner.class)); when(sf.getReader()).thenReturn(r); when(sf.createReader()).thenReturn(r); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java index c440a57..ff80793 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java @@ -735,7 +735,8 @@ public class TestStripeCompactionPolicy { when(r.length()).thenReturn(size); when(r.getBloomFilterType()).thenReturn(BloomType.NONE); when(r.getHFileReader()).thenReturn(mock(HFile.Reader.class)); - when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong())).thenReturn( + when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyInt())) + .thenReturn( mock(StoreFileScanner.class)); when(sf.getReader()).thenReturn(r); when(sf.createReader(anyBoolean())).thenReturn(r); -- 2.3.2 (Apple Git-55)