diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java index 69bfcdf..95deee6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java @@ -392,7 +392,7 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner return 0; } - KeyValueScanner getCurrentForTesting() { + KeyValueScanner getCurrent() { return current; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java index 6474e96..9298719 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java @@ -401,6 +401,10 @@ public class StoreFileScanner implements KeyValueScanner { return true; } + public boolean isBulkLoaded() { + return this.reader.isBulkLoaded(); + } + // Test methods static final long getSeekCount() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 7160b30..3276862 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -770,12 +770,20 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner } /** + * Get current sub-scanner + * @return the current sub-scanner + */ + public KeyValueScanner getCurrent() { + return heap.getCurrent(); + } + + /** * Used in testing. * @return all scanners in no particular order */ List getAllScannersForTesting() { List allScanners = new ArrayList(); - KeyValueScanner current = heap.getCurrentForTesting(); + KeyValueScanner current = heap.getCurrent(); if (current != null) allScanners.add(current); for (KeyValueScanner scanner : heap.getHeap()) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index fdc38c5..1a88e82 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -139,6 +139,11 @@ public abstract class Compactor { if (tmp != null) { fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, Bytes.toLong(tmp)); } + // Get and set the real MVCCReadpoint for new bulkloaded files, which is the + // SeqId number. + if (r.isBulkLoaded()) { + fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, r.getSequenceID()); + } tmp = fileInfo.get(FileInfo.MAX_TAGS_LEN); if (tmp != null) { fd.maxTagsLength = Math.max(fd.maxTagsLength, Bytes.toInt(tmp)); @@ -240,6 +245,19 @@ public abstract class Compactor { // output to writer: for (Cell c : kvs) { KeyValue kv = KeyValueUtil.ensureKeyValue(c); + + // For compaction, scanner is a StoreScanner and its sub-scanners + // are StoreFileScanner. + if (scanner instanceof StoreScanner) { + StoreFileScanner current = (StoreFileScanner) ((StoreScanner) scanner).getCurrent(); + // If the SeqId for this bulkloaded file is greater or equal smallestReadPoint, + // we need to keep this SeqId number into the compaction file. So that read + // consistency is maintained. + if (current != null && current.isBulkLoaded() + && current.getSequenceID() >= smallestReadPoint) { + kv.setSequenceId(current.getSequenceID()); + } + } if (cleanSeqId && kv.getSequenceId() <= smallestReadPoint) { kv.setSequenceId(0); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java index 54e8517..56827bd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java @@ -124,7 +124,7 @@ public class TestScanWithBloomError { KeyValueHeap storeHeap = scanner.getStoreHeapForTesting(); assertEquals(0, storeHeap.getHeap().size()); StoreScanner storeScanner = - (StoreScanner) storeHeap.getCurrentForTesting(); + (StoreScanner) storeHeap.getCurrent(); @SuppressWarnings({ "unchecked", "rawtypes" }) List scanners = (List) (List) storeScanner.getAllScannersForTesting();