Index: src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (revision 1204112) +++ src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (working copy) @@ -55,7 +55,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.InternalScanner; -import org.apache.hadoop.hbase.regionserver.ReadWriteConsistencyControl; +import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; @@ -1469,7 +1469,7 @@ */ public static List getFromStoreFile(Store store, Get get) throws IOException { - ReadWriteConsistencyControl.resetThreadReadPoint(); + MultiVersionConsistencyControl.resetThreadReadPoint(); Scan scan = new Scan(get); InternalScanner scanner = (InternalScanner) store.getScanner(scan, scan.getFamilyMap().get(store.getFamily().getName())); Index: src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java (revision 1204112) +++ src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java (working copy) @@ -250,6 +250,12 @@ writers.add(writer); ctx.addThread(writer); } + // Add a flusher + ctx.addThread(new RepeatingTestThread(ctx) { + public void doAnAction() throws Exception { + util.flush(); + } + }); List getters = Lists.newArrayList(); for (int i = 0; i < numGetters; i++) { @@ -286,7 +292,6 @@ } @Test - @Ignore("Currently not passing - see HBASE-2856") public void testGetAtomicity() throws Exception { util.startMiniCluster(1); try { @@ -297,7 +302,6 @@ } @Test - @Ignore("Currently not passing - see HBASE-2670") public void testScanAtomicity() throws Exception { util.startMiniCluster(1); try { @@ -308,7 +312,6 @@ } @Test - @Ignore("Currently not passing - see HBASE-2670") public void testMixedAtomicity() throws Exception { util.startMiniCluster(1); try { @@ -322,7 +325,7 @@ Configuration c = HBaseConfiguration.create(); TestAcidGuarantees test = new TestAcidGuarantees(); test.setConf(c); - test.runTestAtomicity(5*60*1000, 5, 2, 2, 3); + test.runTestAtomicity(5000, 50, 2, 2, 3); } private void setConf(Configuration c) { Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java (revision 1204112) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java (working copy) @@ -28,12 +28,12 @@ public class TestReadWriteConsistencyControl extends TestCase { static class Writer implements Runnable { final AtomicBoolean finished; - final ReadWriteConsistencyControl rwcc; + final MultiVersionConsistencyControl mvcc; final AtomicBoolean status; - Writer(AtomicBoolean finished, ReadWriteConsistencyControl rwcc, AtomicBoolean status) { + Writer(AtomicBoolean finished, MultiVersionConsistencyControl mvcc, AtomicBoolean status) { this.finished = finished; - this.rwcc = rwcc; + this.mvcc = mvcc; this.status = status; } private Random rnd = new Random(); @@ -41,7 +41,7 @@ public void run() { while (!finished.get()) { - ReadWriteConsistencyControl.WriteEntry e = rwcc.beginMemstoreInsert(); + MultiVersionConsistencyControl.WriteEntry e = mvcc.beginMemstoreInsert(); // System.out.println("Begin write: " + e.getWriteNumber()); // 10 usec - 500usec (including 0) int sleepTime = rnd.nextInt(500); @@ -53,7 +53,7 @@ } catch (InterruptedException e1) { } try { - rwcc.completeMemstoreInsert(e); + mvcc.completeMemstoreInsert(e); } catch (RuntimeException ex) { // got failure System.out.println(ex.toString()); @@ -67,7 +67,7 @@ } public void testParallelism() throws Exception { - final ReadWriteConsistencyControl rwcc = new ReadWriteConsistencyControl(); + final MultiVersionConsistencyControl mvcc = new MultiVersionConsistencyControl(); final AtomicBoolean finished = new AtomicBoolean(false); @@ -76,9 +76,9 @@ final AtomicLong failedAt = new AtomicLong(); Runnable reader = new Runnable() { public void run() { - long prev = rwcc.memstoreReadPoint(); + long prev = mvcc.memstoreReadPoint(); while (!finished.get()) { - long newPrev = rwcc.memstoreReadPoint(); + long newPrev = mvcc.memstoreReadPoint(); if (newPrev < prev) { // serious problem. System.out.println("Reader got out of order, prev: " + @@ -100,7 +100,7 @@ for (int i = 0 ; i < n ; ++i ) { statuses[i] = new AtomicBoolean(true); - writers[i] = new Thread(new Writer(finished, rwcc, statuses[i])); + writers[i] = new Thread(new Writer(finished, mvcc, statuses[i])); writers[i].start(); } readThread.start(); Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java (revision 1204112) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java (working copy) @@ -123,7 +123,7 @@ StoreFile sf = new StoreFile(fs, writer.getPath(), util.getConfiguration(), cacheConf, BloomType.NONE); List scanners = StoreFileScanner.getScannersForStoreFiles( - Collections.singletonList(sf), false, true); + Collections.singletonList(sf), false, true, false); KeyValueScanner scanner = scanners.get(0); FaultyInputStream inStream = fs.inStreams.get(0).get(); Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java (revision 1204112) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java (working copy) @@ -54,7 +54,7 @@ long timestamp = 0; //"Match" for(byte [] col : scannerColumns){ - result.add(exp.checkColumn(col, 0, col.length, ++timestamp)); + result.add(exp.checkColumn(col, 0, col.length, ++timestamp, false)); } assertEquals(expected.size(), result.size()); @@ -80,9 +80,9 @@ List expected = new ArrayList(); expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL); // col1 expected.add(ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL); // col2 - expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL); // col3 + expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL); // col3 expected.add(ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW); // col4 - expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW); // col5 + expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW); // col5 int maxVersions = 1; //Create "Scanner" @@ -166,13 +166,13 @@ Long.MAX_VALUE); for (int i = 0; i < 100000; i+=2) { byte [] col = Bytes.toBytes("col"+i); - explicit.checkColumn(col, 0, col.length, 1); + explicit.checkColumn(col, 0, col.length, 1, false); } explicit.update(); for (int i = 1; i < 100000; i+=2) { byte [] col = Bytes.toBytes("col"+i); - explicit.checkColumn(col, 0, col.length, 1); + explicit.checkColumn(col, 0, col.length, 1, false); } } Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java (revision 1204112) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java (working copy) @@ -54,7 +54,7 @@ for(byte [] qualifier : qualifiers) { ScanQueryMatcher.MatchCode mc = tracker.checkColumn(qualifier, 0, - qualifier.length, 1); + qualifier.length, 1, false); actual.add(mc); } @@ -87,7 +87,7 @@ long timestamp = 0; for(byte [] qualifier : qualifiers) { MatchCode mc = tracker.checkColumn(qualifier, 0, qualifier.length, - ++timestamp); + ++timestamp, false); actual.add(mc); } @@ -110,7 +110,7 @@ try { for(byte [] qualifier : qualifiers) { - tracker.checkColumn(qualifier, 0, qualifier.length, 1); + tracker.checkColumn(qualifier, 0, qualifier.length, 1, false); } } catch (Exception e) { ok = true; Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java (revision 1204112) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java (working copy) @@ -26,8 +26,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.NavigableSet; -import java.util.TreeSet; import java.util.concurrent.atomic.AtomicReference; import junit.framework.TestCase; @@ -39,7 +37,6 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueTestUtil; -import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; @@ -57,12 +54,12 @@ private static final byte [] CONTENTS = Bytes.toBytes("contents"); private static final byte [] BASIC = Bytes.toBytes("basic"); private static final String CONTENTSTR = "contentstr"; - private ReadWriteConsistencyControl rwcc; + private MultiVersionConsistencyControl mvcc; @Override public void setUp() throws Exception { super.setUp(); - this.rwcc = new ReadWriteConsistencyControl(); + this.mvcc = new MultiVersionConsistencyControl(); this.memstore = new MemStore(); } @@ -88,7 +85,7 @@ List memstorescanners = this.memstore.getScanners(); Scan scan = new Scan(); List result = new ArrayList(); - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); StoreScanner s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP, this.memstore.comparator, null, memstorescanners); int count = 0; @@ -108,7 +105,7 @@ scanner.close(); } - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); memstorescanners = this.memstore.getScanners(); // Now assert can count same number even if a snapshot mid-scan. s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP, @@ -200,7 +197,7 @@ private void verifyScanAcrossSnapshot2(KeyValue kv1, KeyValue kv2) throws IOException { - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); List memstorescanners = this.memstore.getScanners(); assertEquals(1, memstorescanners.size()); final KeyValueScanner scanner = memstorescanners.get(0); @@ -235,35 +232,35 @@ final byte[] q2 = Bytes.toBytes("q2"); final byte[] v = Bytes.toBytes("value"); - ReadWriteConsistencyControl.WriteEntry w = - rwcc.beginMemstoreInsert(); + MultiVersionConsistencyControl.WriteEntry w = + mvcc.beginMemstoreInsert(); KeyValue kv1 = new KeyValue(row, f, q1, v); kv1.setMemstoreTS(w.getWriteNumber()); memstore.add(kv1); - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); KeyValueScanner s = this.memstore.getScanners().get(0); assertScannerResults(s, new KeyValue[]{}); - rwcc.completeMemstoreInsert(w); + mvcc.completeMemstoreInsert(w); - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); s = this.memstore.getScanners().get(0); assertScannerResults(s, new KeyValue[]{kv1}); - w = rwcc.beginMemstoreInsert(); + w = mvcc.beginMemstoreInsert(); KeyValue kv2 = new KeyValue(row, f, q2, v); kv2.setMemstoreTS(w.getWriteNumber()); memstore.add(kv2); - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); s = this.memstore.getScanners().get(0); assertScannerResults(s, new KeyValue[]{kv1}); - rwcc.completeMemstoreInsert(w); + mvcc.completeMemstoreInsert(w); - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); s = this.memstore.getScanners().get(0); assertScannerResults(s, new KeyValue[]{kv1, kv2}); } @@ -283,8 +280,8 @@ final byte[] v2 = Bytes.toBytes("value2"); // INSERT 1: Write both columns val1 - ReadWriteConsistencyControl.WriteEntry w = - rwcc.beginMemstoreInsert(); + MultiVersionConsistencyControl.WriteEntry w = + mvcc.beginMemstoreInsert(); KeyValue kv11 = new KeyValue(row, f, q1, v1); kv11.setMemstoreTS(w.getWriteNumber()); @@ -293,15 +290,15 @@ KeyValue kv12 = new KeyValue(row, f, q2, v1); kv12.setMemstoreTS(w.getWriteNumber()); memstore.add(kv12); - rwcc.completeMemstoreInsert(w); + mvcc.completeMemstoreInsert(w); // BEFORE STARTING INSERT 2, SEE FIRST KVS - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); KeyValueScanner s = this.memstore.getScanners().get(0); assertScannerResults(s, new KeyValue[]{kv11, kv12}); // START INSERT 2: Write both columns val2 - w = rwcc.beginMemstoreInsert(); + w = mvcc.beginMemstoreInsert(); KeyValue kv21 = new KeyValue(row, f, q1, v2); kv21.setMemstoreTS(w.getWriteNumber()); memstore.add(kv21); @@ -311,17 +308,17 @@ memstore.add(kv22); // BEFORE COMPLETING INSERT 2, SEE FIRST KVS - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); s = this.memstore.getScanners().get(0); assertScannerResults(s, new KeyValue[]{kv11, kv12}); // COMPLETE INSERT 2 - rwcc.completeMemstoreInsert(w); + mvcc.completeMemstoreInsert(w); // NOW SHOULD SEE NEW KVS IN ADDITION TO OLD KVS. // See HBASE-1485 for discussion about what we should do with // the duplicate-TS inserts - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); s = this.memstore.getScanners().get(0); assertScannerResults(s, new KeyValue[]{kv21, kv11, kv22, kv12}); } @@ -338,8 +335,8 @@ final byte[] q2 = Bytes.toBytes("q2"); final byte[] v1 = Bytes.toBytes("value1"); // INSERT 1: Write both columns val1 - ReadWriteConsistencyControl.WriteEntry w = - rwcc.beginMemstoreInsert(); + MultiVersionConsistencyControl.WriteEntry w = + mvcc.beginMemstoreInsert(); KeyValue kv11 = new KeyValue(row, f, q1, v1); kv11.setMemstoreTS(w.getWriteNumber()); @@ -348,30 +345,30 @@ KeyValue kv12 = new KeyValue(row, f, q2, v1); kv12.setMemstoreTS(w.getWriteNumber()); memstore.add(kv12); - rwcc.completeMemstoreInsert(w); + mvcc.completeMemstoreInsert(w); // BEFORE STARTING INSERT 2, SEE FIRST KVS - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); KeyValueScanner s = this.memstore.getScanners().get(0); assertScannerResults(s, new KeyValue[]{kv11, kv12}); // START DELETE: Insert delete for one of the columns - w = rwcc.beginMemstoreInsert(); + w = mvcc.beginMemstoreInsert(); KeyValue kvDel = new KeyValue(row, f, q2, kv11.getTimestamp(), KeyValue.Type.DeleteColumn); kvDel.setMemstoreTS(w.getWriteNumber()); memstore.add(kvDel); // BEFORE COMPLETING DELETE, SEE FIRST KVS - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); s = this.memstore.getScanners().get(0); assertScannerResults(s, new KeyValue[]{kv11, kv12}); // COMPLETE DELETE - rwcc.completeMemstoreInsert(w); + mvcc.completeMemstoreInsert(w); // NOW WE SHOULD SEE DELETE - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); s = this.memstore.getScanners().get(0); assertScannerResults(s, new KeyValue[]{kv11, kvDel, kv12}); } @@ -385,7 +382,7 @@ final byte[] f = Bytes.toBytes("family"); final byte[] q1 = Bytes.toBytes("q1"); - final ReadWriteConsistencyControl rwcc; + final MultiVersionConsistencyControl mvcc; final MemStore memstore; AtomicReference caughtException; @@ -393,10 +390,10 @@ public ReadOwnWritesTester(int id, MemStore memstore, - ReadWriteConsistencyControl rwcc, + MultiVersionConsistencyControl mvcc, AtomicReference caughtException) { - this.rwcc = rwcc; + this.mvcc = mvcc; this.memstore = memstore; this.caughtException = caughtException; row = Bytes.toBytes(id); @@ -412,8 +409,8 @@ private void internalRun() throws IOException { for (long i = 0; i < NUM_TRIES && caughtException.get() == null; i++) { - ReadWriteConsistencyControl.WriteEntry w = - rwcc.beginMemstoreInsert(); + MultiVersionConsistencyControl.WriteEntry w = + mvcc.beginMemstoreInsert(); // Insert the sequence value (i) byte[] v = Bytes.toBytes(i); @@ -421,10 +418,10 @@ KeyValue kv = new KeyValue(row, f, q1, i, v); kv.setMemstoreTS(w.getWriteNumber()); memstore.add(kv); - rwcc.completeMemstoreInsert(w); + mvcc.completeMemstoreInsert(w); // Assert that we can read back - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); KeyValueScanner s = this.memstore.getScanners().get(0); s.seek(kv); @@ -445,7 +442,7 @@ AtomicReference caught = new AtomicReference(); for (int i = 0; i < NUM_THREADS; i++) { - threads[i] = new ReadOwnWritesTester(i, memstore, rwcc, caught); + threads[i] = new ReadOwnWritesTester(i, memstore, mvcc, caught); threads[i].start(); } @@ -533,6 +530,7 @@ * @throws InterruptedException */ public void testGetNextRow() throws Exception { + MultiVersionConsistencyControl.resetThreadReadPoint(); addRows(this.memstore); // Add more versions to make it a little more interesting. Thread.sleep(1); @@ -946,7 +944,7 @@ } public static void main(String [] args) throws IOException { - ReadWriteConsistencyControl rwcc = new ReadWriteConsistencyControl(); + MultiVersionConsistencyControl mvcc = new MultiVersionConsistencyControl(); MemStore ms = new MemStore(); long n1 = System.nanoTime(); @@ -956,7 +954,7 @@ System.out.println("foo"); - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); for (int i = 0 ; i < 50 ; i++) doScan(ms, i); Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (revision 1204112) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (working copy) @@ -1446,12 +1446,12 @@ scan.addFamily(fam2); scan.addFamily(fam4); is = (RegionScannerImpl) region.getScanner(scan); - ReadWriteConsistencyControl.resetThreadReadPoint(region.getRWCC()); + MultiVersionConsistencyControl.resetThreadReadPoint(region.getMVCC()); assertEquals(1, ((RegionScannerImpl)is).storeHeap.getHeap().size()); scan = new Scan(); is = (RegionScannerImpl) region.getScanner(scan); - ReadWriteConsistencyControl.resetThreadReadPoint(region.getRWCC()); + MultiVersionConsistencyControl.resetThreadReadPoint(region.getMVCC()); assertEquals(families.length -1, ((RegionScannerImpl)is).storeHeap.getHeap().size()); } Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java (revision 1204112) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java (working copy) @@ -754,7 +754,8 @@ for (int i=numKVs;i>0;i--) { KeyValue kv = new KeyValue(b, b, b, i, b); kvs.add(kv); - totalSize += kv.getLength(); + // kv has memstoreTS 0, which takes 1 byte to store. + totalSize += kv.getLength() + 1; } int blockSize = totalSize / numBlocks; StoreFile.Writer writer = new StoreFile.Writer(fs, path, blockSize, Index: src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java (revision 1204112) +++ src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java (working copy) @@ -184,9 +184,10 @@ } LOG.info("Block count by type: " + blockCountByType); + String countByType = blockCountByType.toString(); assertEquals( - "{DATA=1367, LEAF_INDEX=172, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=24}", - blockCountByType.toString()); + "{DATA=1379, LEAF_INDEX=173, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=24}", + countByType); reader.close(); } Index: src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java (revision 1204112) +++ src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java (working copy) @@ -22,6 +22,8 @@ import static org.junit.Assert.*; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -36,8 +38,11 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableUtils; import org.junit.Before; import org.junit.Test; @@ -70,9 +75,6 @@ HFileWriterV2 writer = new HFileWriterV2(conf, new CacheConfig(conf), fs, hfilePath, 4096, COMPRESS_ALGO, KeyValue.KEY_COMPARATOR); - long totalKeyLength = 0; - long totalValueLength = 0; - Random rand = new Random(9713312); // Just a fixed seed. final int ENTRY_COUNT = 10000; @@ -86,9 +88,6 @@ byte[] valueBytes = randomValue(rand); writer.append(keyBytes, valueBytes); - totalKeyLength += keyBytes.length; - totalValueLength += valueBytes.length; - keys.add(keyBytes); values.add(valueBytes); } @@ -115,10 +114,36 @@ HFileBlock.FSReader blockReader = new HFileBlock.FSReaderV2(fsdis, COMPRESS_ALGO, fileSize); + // Comparator class name is stored in the trailer in version 2. + RawComparator comparator = trailer.createComparator(); + HFileBlockIndex.BlockIndexReader dataBlockIndexReader = new HFileBlockIndex.BlockIndexReader(comparator, + trailer.getNumDataIndexLevels()); + HFileBlockIndex.BlockIndexReader metaBlockIndexReader = new HFileBlockIndex.BlockIndexReader( + Bytes.BYTES_RAWCOMPARATOR, 1); + HFileBlock.BlockIterator blockIter = blockReader.blockRange( + trailer.getLoadOnOpenDataOffset(), + fileSize - trailer.getTrailerSize()); + // Data index. We also read statistics about the block index written after + // the root level. + dataBlockIndexReader.readMultiLevelIndexRoot( + blockIter.nextBlockAsStream(BlockType.ROOT_INDEX), + trailer.getDataIndexCount()); + + // Meta index. + metaBlockIndexReader.readRootIndex( + blockIter.nextBlockAsStream(BlockType.ROOT_INDEX), + trailer.getMetaIndexCount()); + // File info + FileInfo fileInfo = new FileInfo(); + fileInfo.readFields(blockIter.nextBlockAsStream(BlockType.FILE_INFO)); + byte [] keyValueFormatVersion = fileInfo.get(HFileWriterV2.KEY_VALUE_VERSION); + boolean includeMemstoreTS = (keyValueFormatVersion != null && Bytes.toInt(keyValueFormatVersion) > 0); + // Counters for the number of key/value pairs and the number of blocks int entriesRead = 0; int blocksRead = 0; + long memstoreTS = 0; // Scan blocks the way the reader would scan them fsdis.seek(0); @@ -137,6 +162,15 @@ byte[] value = new byte[valueLen]; buf.get(value); + if (includeMemstoreTS) { + ByteArrayInputStream byte_input = new ByteArrayInputStream(buf.array(), + buf.arrayOffset() + buf.position(), buf.remaining()); + DataInputStream data_input = new DataInputStream(byte_input); + + memstoreTS = WritableUtils.readVLong(data_input); + buf.position(buf.position() + WritableUtils.getVIntSize(memstoreTS)); + } + // A brute-force check to see that all keys and values are correct. assertTrue(Bytes.compareTo(key, keys.get(entriesRead)) == 0); assertTrue(Bytes.compareTo(value, values.get(entriesRead)) == 0); Index: src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java (revision 1204112) +++ src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java (working copy) @@ -47,12 +47,15 @@ * @param offset * @param length * @param ttl The timeToLive to enforce. + * @param ignoreCount indicates if the KV needs to be excluded while counting + * (used during compactions. We only count KV's that are older than all the + * scanners' read points.) * @return The match code instance. * @throws IOException in case there is an internal consistency problem * caused by a data corruption. */ public ScanQueryMatcher.MatchCode checkColumn(byte [] bytes, int offset, - int length, long ttl) throws IOException; + int length, long ttl, boolean ignoreCount) throws IOException; /** * Updates internal variables in between files Index: src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java (revision 1204112) +++ src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java (working copy) @@ -1,169 +0,0 @@ -/** - * Copyright 2010 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import java.util.LinkedList; - -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.ClassSize; - -/** - * Manages the read/write consistency within memstore. This provides - * an interface for readers to determine what entries to ignore, and - * a mechanism for writers to obtain new write numbers, then "commit" - * the new writes for readers to read (thus forming atomic transactions). - */ -public class ReadWriteConsistencyControl { - private volatile long memstoreRead = 0; - private volatile long memstoreWrite = 0; - - private final Object readWaiters = new Object(); - - // This is the pending queue of writes. - private final LinkedList writeQueue = - new LinkedList(); - - private static final ThreadLocal perThreadReadPoint = - new ThreadLocal(); - - /** - * Get this thread's read point. Used primarily by the memstore scanner to - * know which values to skip (ie: have not been completed/committed to - * memstore). - */ - public static long getThreadReadPoint() { - return perThreadReadPoint.get(); - } - - /** - * Set the thread read point to the given value. The thread RWCC - * is used by the Memstore scanner so it knows which values to skip. - * Give it a value of 0 if you want everything. - */ - public static void setThreadReadPoint(long readPoint) { - perThreadReadPoint.set(readPoint); - } - - /** - * Set the thread RWCC read point to whatever the current read point is in - * this particular instance of RWCC. Returns the new thread read point value. - */ - public static long resetThreadReadPoint(ReadWriteConsistencyControl rwcc) { - perThreadReadPoint.set(rwcc.memstoreReadPoint()); - return getThreadReadPoint(); - } - - /** - * Set the thread RWCC read point to 0 (include everything). - */ - public static void resetThreadReadPoint() { - perThreadReadPoint.set(0L); - } - - public WriteEntry beginMemstoreInsert() { - synchronized (writeQueue) { - long nextWriteNumber = ++memstoreWrite; - WriteEntry e = new WriteEntry(nextWriteNumber); - writeQueue.add(e); - return e; - } - } - - public void completeMemstoreInsert(WriteEntry e) { - synchronized (writeQueue) { - e.markCompleted(); - - long nextReadValue = -1; - boolean ranOnce=false; - while (!writeQueue.isEmpty()) { - ranOnce=true; - WriteEntry queueFirst = writeQueue.getFirst(); - - if (nextReadValue > 0) { - if (nextReadValue+1 != queueFirst.getWriteNumber()) { - throw new RuntimeException("invariant in completeMemstoreInsert violated, prev: " - + nextReadValue + " next: " + queueFirst.getWriteNumber()); - } - } - - if (queueFirst.isCompleted()) { - nextReadValue = queueFirst.getWriteNumber(); - writeQueue.removeFirst(); - } else { - break; - } - } - - if (!ranOnce) { - throw new RuntimeException("never was a first"); - } - - if (nextReadValue > 0) { - synchronized (readWaiters) { - memstoreRead = nextReadValue; - readWaiters.notifyAll(); - } - - } - } - - boolean interrupted = false; - synchronized (readWaiters) { - while (memstoreRead < e.getWriteNumber()) { - try { - readWaiters.wait(0); - } catch (InterruptedException ie) { - // We were interrupted... finish the loop -- i.e. cleanup --and then - // on our way out, reset the interrupt flag. - interrupted = true; - } - } - } - if (interrupted) Thread.currentThread().interrupt(); - } - - public long memstoreReadPoint() { - return memstoreRead; - } - - - public static class WriteEntry { - private long writeNumber; - private boolean completed = false; - WriteEntry(long writeNumber) { - this.writeNumber = writeNumber; - } - void markCompleted() { - this.completed = true; - } - boolean isCompleted() { - return this.completed; - } - long getWriteNumber() { - return this.writeNumber; - } - } - - public static final long FIXED_SIZE = ClassSize.align( - ClassSize.OBJECT + - 2 * Bytes.SIZEOF_LONG + - 2 * ClassSize.REFERENCE); - -} Index: src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (revision 1204112) +++ src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (working copy) @@ -77,7 +77,7 @@ public ScanQueryMatcher(Scan scan, byte [] family, NavigableSet columns, long ttl, KeyValue.KeyComparator rowComparator, int minVersions, int maxVersions, - boolean retainDeletesInOutput) { + boolean retainDeletesInOutput, long readPointToUse) { this.tr = scan.getTimeRange(); this.rowComparator = rowComparator; this.deletes = new ScanDeleteTracker(); @@ -85,6 +85,7 @@ this.startKey = KeyValue.createFirstOnRow(scan.getStartRow()); this.filter = scan.getFilter(); this.retainDeletesInOutput = retainDeletesInOutput; + this.maxReadPointToTrackVersions = readPointToUse; // Single branch to deal with two types of reads (columns vs all in family) if (columns == null || columns.size() == 0) { @@ -105,7 +106,7 @@ /* By default we will not include deletes */ /* deletes are included explicitly (for minor compaction) */ this(scan, family, columns, ttl, rowComparator, minVersions, maxVersions, - false); + false, Long.MAX_VALUE /* max Readpoint to track versions */); } public ScanQueryMatcher(Scan scan, byte [] family, NavigableSet columns, long ttl, @@ -113,6 +114,9 @@ this(scan, family, columns, ttl, rowComparator, 0, maxVersions); } + /** readPoint over which the KVs are unconditionally included */ + protected long maxReadPointToTrackVersions; + /** * Determines if the caller should do one of several things: * - seek/skip to the next row (MatchCode.SEEK_NEXT_ROW) @@ -229,7 +233,8 @@ } } - MatchCode colChecker = columns.checkColumn(bytes, offset, qualLength, timestamp); + MatchCode colChecker = columns.checkColumn(bytes, offset, qualLength, + timestamp, kv.getMemstoreTS() > maxReadPointToTrackVersions); /* * According to current implementation, colChecker can only be * SEEK_NEXT_COL, SEEK_NEXT_ROW, SKIP or INCLUDE. Therefore, always return Index: src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java (revision 1204112) +++ src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java (working copy) @@ -44,14 +44,16 @@ private final StoreFile.Reader reader; private final HFileScanner hfs; private KeyValue cur = null; + private boolean enforceMVCC = false; /** * Implements a {@link KeyValueScanner} on top of the specified {@link HFileScanner} * @param hfs HFile scanner */ - public StoreFileScanner(StoreFile.Reader reader, HFileScanner hfs) { + public StoreFileScanner(StoreFile.Reader reader, HFileScanner hfs, boolean useMVCC) { this.reader = reader; this.hfs = hfs; + this.enforceMVCC = useMVCC; } /** @@ -62,11 +64,20 @@ Collection filesToCompact, boolean cacheBlocks, boolean usePread) throws IOException { - List scanners = - new ArrayList(filesToCompact.size()); - for (StoreFile file : filesToCompact) { + return getScannersForStoreFiles(filesToCompact, cacheBlocks, usePread, false); + } + + /** + * Return an array of scanners corresponding to the given set of store files. + */ + public static List getScannersForStoreFiles( + Collection files, boolean cacheBlocks, boolean usePread, + boolean isCompaction) throws IOException { + List scanners = new ArrayList( + files.size()); + for (StoreFile file : files) { StoreFile.Reader r = file.createReader(); - scanners.add(r.getStoreFileScanner(cacheBlocks, usePread)); + scanners.add(r.getStoreFileScanner(cacheBlocks, usePread, isCompaction)); } return scanners; } @@ -81,11 +92,13 @@ public KeyValue next() throws IOException { KeyValue retKey = cur; + try { // only seek if we aren't at the end. cur == null implies 'end'. if (cur != null) { hfs.next(); cur = hfs.getKeyValue(); + skipKVsNewerThanReadpoint(); } } catch(IOException e) { throw new IOException("Could not iterate " + this, e); @@ -100,7 +113,7 @@ return false; } cur = hfs.getKeyValue(); - return true; + return skipKVsNewerThanReadpoint(); } catch(IOException ioe) { throw new IOException("Could not seek " + this, ioe); } @@ -113,12 +126,41 @@ return false; } cur = hfs.getKeyValue(); - return true; + return skipKVsNewerThanReadpoint(); } catch (IOException ioe) { throw new IOException("Could not seek " + this, ioe); } } + protected boolean skipKVsNewerThanReadpoint() throws IOException { + long readPoint = MultiVersionConsistencyControl.getThreadReadPoint(); + + // We want to ignore all key-values that are newer than our current + // readPoint + while(enforceMVCC + && cur != null + && (cur.getMemstoreTS() > readPoint)) { + hfs.next(); + cur = hfs.getKeyValue(); + } + + if (cur == null) { + close(); + return false; + } + + // For the optimisation in HBASE-4346, we set the KV's memstoreTS to + // 0, if it is older than all the scanners' read points. It is possible + // that a newer KV's memstoreTS was reset to 0. But, there is an + // older KV which was not reset to 0 (because it was + // not old enough during flush). Make sure that we set it correctly now, + // so that the comparision order does not change. + if (cur.getMemstoreTS() <= readPoint) { + cur.setMemstoreTS(0); + } + return true; + } + public void close() { // Nothing to close on HFileScanner? cur = null; Index: src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (revision 1204112) +++ src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (working copy) @@ -67,7 +67,7 @@ matcher = new ScanQueryMatcher(scan, store.getFamily().getName(), columns, store.ttl, store.comparator.getRawComparator(), store.minVersions, store.versionsToReturn(scan.getMaxVersions()), - false); + false, Long.MAX_VALUE); this.isGet = scan.isGetScan(); // pass columns = try to filter out unnecessary ScanFiles @@ -96,16 +96,18 @@ * @param store who we scan * @param scan the spec * @param scanners ancilliary scanners + * @param smallestReadPoint the readPoint that we should use for tracking versions + * @param retainDeletesInOutput should we retain deletes after compaction? */ StoreScanner(Store store, Scan scan, List scanners, - boolean retainDeletesInOutput) + boolean retainDeletesInOutput, long smallestReadPoint) throws IOException { this.store = store; this.cacheBlocks = false; this.isGet = false; matcher = new ScanQueryMatcher(scan, store.getFamily().getName(), null, store.ttl, store.comparator.getRawComparator(), store.minVersions, - store.versionsToReturn(scan.getMaxVersions()), retainDeletesInOutput); + store.versionsToReturn(scan.getMaxVersions()), retainDeletesInOutput, smallestReadPoint); // Seek all scanners to the initial key for(KeyValueScanner scanner : scanners) { @@ -126,7 +128,8 @@ this.isGet = false; this.cacheBlocks = scan.getCacheBlocks(); this.matcher = new ScanQueryMatcher(scan, colFamily, columns, ttl, - comparator.getRawComparator(), 0, scan.getMaxVersions(), false); + comparator.getRawComparator(), 0, scan.getMaxVersions(), false, + Long.MAX_VALUE); // Seek all scanners to the initial key for(KeyValueScanner scanner : scanners) { @@ -139,19 +142,7 @@ * @return List of scanners ordered properly. */ private List getScanners() throws IOException { - // First the store file scanners - - // TODO this used to get the store files in descending order, - // but now we get them in ascending order, which I think is - // actually more correct, since memstore get put at the end. - List sfScanners = StoreFileScanner - .getScannersForStoreFiles(store.getStorefiles(), cacheBlocks, isGet); - List scanners = - new ArrayList(sfScanners.size()+1); - scanners.addAll(sfScanners); - // Then the memstore scanners - scanners.addAll(this.store.memstore.getScanners()); - return scanners; + return this.store.getScanners(cacheBlocks, isGet, false); } /* @@ -169,24 +160,27 @@ memOnly = false; filesOnly = false; } - List scanners = new LinkedList(); - // First the store file scanners - if (memOnly == false) { - List sfScanners = StoreFileScanner - .getScannersForStoreFiles(store.getStorefiles(), cacheBlocks, isGet); + List allStoreScanners = + this.store.getScanners(cacheBlocks, isGet, false); - // include only those scan files which pass all filters - for (StoreFileScanner sfs : sfScanners) { - if (sfs.shouldSeek(scan, columns)) { - scanners.add(sfs); + List scanners = + new ArrayList(allStoreScanners.size()); + + // include only those scan files which pass all filters + for (KeyValueScanner kvs : allStoreScanners) { + if (kvs instanceof StoreFileScanner) { + if (memOnly == false + && ((StoreFileScanner) kvs).shouldSeek(scan, columns)) { + scanners.add(kvs); } + } else { + // kvs is a MemStoreScanner + if (filesOnly == false && this.store.memstore.shouldSeek(scan)) { + scanners.add(kvs); + } } } - // Then the memstore scanners - if ((filesOnly == false) && (this.store.memstore.shouldSeek(scan))) { - scanners.addAll(this.store.memstore.getScanners()); - } return scanners; } Index: src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java (revision 1204112) +++ src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java (working copy) @@ -106,7 +106,7 @@ * @return MatchCode telling ScanQueryMatcher what action to take */ public ScanQueryMatcher.MatchCode checkColumn(byte [] bytes, int offset, - int length, long timestamp) { + int length, long timestamp, boolean ignoreCount) { do { // No more columns left, we are done with this query if(this.columns.size() == 0) { @@ -125,6 +125,8 @@ // Column Matches. If it is not a duplicate key, increment the version count // and include. if(ret == 0) { + if (ignoreCount) return ScanQueryMatcher.MatchCode.INCLUDE; + //If column matches, check if it is a duplicate timestamp if (sameAsPreviousTS(timestamp)) { //If duplicate, skip this Key Index: src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java (revision 0) @@ -0,0 +1,210 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.util.LinkedList; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ClassSize; + +/** + * Manages the read/write consistency within memstore. This provides + * an interface for readers to determine what entries to ignore, and + * a mechanism for writers to obtain new write numbers, then "commit" + * the new writes for readers to read (thus forming atomic transactions). + */ +public class MultiVersionConsistencyControl { + private volatile long memstoreRead = 0; + private volatile long memstoreWrite = 0; + + private final Object readWaiters = new Object(); + + // This is the pending queue of writes. + private final LinkedList writeQueue = + new LinkedList(); + + private static final ThreadLocal perThreadReadPoint = + new ThreadLocal() { + @Override + protected + Long initialValue() { + return Long.MAX_VALUE; + } + }; + + /** + * Default constructor. Initializes the memstoreRead/Write points to 0. + */ + public MultiVersionConsistencyControl() { + this.memstoreRead = this.memstoreWrite = 0; + } + + /** + * Initializes the memstoreRead/Write points appropriately. + * @param startPoint + */ + public void initialize(long startPoint) { + synchronized (writeQueue) { + if (this.memstoreWrite != this.memstoreRead) { + throw new RuntimeException("Already used this mvcc. Too late to initialize"); + } + + this.memstoreRead = this.memstoreWrite = startPoint; + } + } + + /** + * Get this thread's read point. Used primarily by the memstore scanner to + * know which values to skip (ie: have not been completed/committed to + * memstore). + */ + public static long getThreadReadPoint() { + return perThreadReadPoint.get(); + } + + /** + * Set the thread read point to the given value. The thread MVCC + * is used by the Memstore scanner so it knows which values to skip. + * Give it a value of 0 if you want everything. + */ + public static void setThreadReadPoint(long readPoint) { + perThreadReadPoint.set(readPoint); + } + + /** + * Set the thread MVCC read point to whatever the current read point is in + * this particular instance of MVCC. Returns the new thread read point value. + */ + public static long resetThreadReadPoint(MultiVersionConsistencyControl mvcc) { + perThreadReadPoint.set(mvcc.memstoreReadPoint()); + return getThreadReadPoint(); + } + + /** + * Set the thread MVCC read point to 0 (include everything). + */ + public static void resetThreadReadPoint() { + perThreadReadPoint.set(0L); + } + + public WriteEntry beginMemstoreInsert() { + synchronized (writeQueue) { + long nextWriteNumber = ++memstoreWrite; + WriteEntry e = new WriteEntry(nextWriteNumber); + writeQueue.add(e); + return e; + } + } + + public void completeMemstoreInsert(WriteEntry e) { + advanceMemstore(e); + waitForRead(e); + } + + boolean advanceMemstore(WriteEntry e) { + synchronized (writeQueue) { + e.markCompleted(); + + long nextReadValue = -1; + boolean ranOnce=false; + while (!writeQueue.isEmpty()) { + ranOnce=true; + WriteEntry queueFirst = writeQueue.getFirst(); + + if (nextReadValue > 0) { + if (nextReadValue+1 != queueFirst.getWriteNumber()) { + throw new RuntimeException("invariant in completeMemstoreInsert violated, prev: " + + nextReadValue + " next: " + queueFirst.getWriteNumber()); + } + } + + if (queueFirst.isCompleted()) { + nextReadValue = queueFirst.getWriteNumber(); + writeQueue.removeFirst(); + } else { + break; + } + } + + if (!ranOnce) { + throw new RuntimeException("never was a first"); + } + + if (nextReadValue > 0) { + synchronized (readWaiters) { + memstoreRead = nextReadValue; + readWaiters.notifyAll(); + } + } + if (memstoreRead >= e.getWriteNumber()) { + return true; + } + return false; + } + } + + /** + * Wait for the global readPoint to advance upto + * the specified transaction number. + */ + public void waitForRead(WriteEntry e) { + boolean interrupted = false; + synchronized (readWaiters) { + while (memstoreRead < e.getWriteNumber()) { + try { + readWaiters.wait(0); + } catch (InterruptedException ie) { + // We were interrupted... finish the loop -- i.e. cleanup --and then + // on our way out, reset the interrupt flag. + interrupted = true; + } + } + } + if (interrupted) Thread.currentThread().interrupt(); + } + + public long memstoreReadPoint() { + return memstoreRead; + } + + + public static class WriteEntry { + private long writeNumber; + private boolean completed = false; + WriteEntry(long writeNumber) { + this.writeNumber = writeNumber; + } + void markCompleted() { + this.completed = true; + } + boolean isCompleted() { + return this.completed; + } + long getWriteNumber() { + return this.writeNumber; + } + } + + public static final long FIXED_SIZE = ClassSize.align( + ClassSize.OBJECT + + 2 * Bytes.SIZEOF_LONG + + 2 * ClassSize.REFERENCE); + +} Index: src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java (revision 1204112) +++ src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java (working copy) @@ -66,19 +66,23 @@ * @param offset * @param length * @param timestamp + * @param ignoreCount * @return The match code instance. */ @Override public MatchCode checkColumn(byte[] bytes, int offset, int length, - long timestamp) throws IOException { + long timestamp, boolean ignoreCount) throws IOException { if (columnBuffer == null) { // first iteration. resetBuffer(bytes, offset, length); + if (ignoreCount) return ScanQueryMatcher.MatchCode.INCLUDE; return checkVersion(++currentCount, timestamp); } int cmp = Bytes.compareTo(bytes, offset, length, columnBuffer, columnOffset, columnLength); if (cmp == 0) { + if (ignoreCount) return ScanQueryMatcher.MatchCode.INCLUDE; + //If column matches, check if it is a duplicate timestamp if (sameAsPreviousTS(timestamp)) { return ScanQueryMatcher.MatchCode.SKIP; @@ -92,6 +96,7 @@ if (cmp > 0) { // switched columns, lets do something.x resetBuffer(bytes, offset, length); + if (ignoreCount) return ScanQueryMatcher.MatchCode.INCLUDE; return checkVersion(++currentCount, timestamp); } Index: src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (revision 1204112) +++ src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (working copy) @@ -641,6 +641,10 @@ private Iterator kvsetIt; private Iterator snapshotIt; + // The kvset and snapshot at the time of creating this scanner + volatile KeyValueSkipListSet kvsetAtCreation; + volatile KeyValueSkipListSet snapshotAtCreation; + // Sub lists on which we're iterating private SortedSet kvTail; private SortedSet snapshotTail; @@ -671,10 +675,13 @@ MemStoreScanner() { super(); + + kvsetAtCreation = kvset; + snapshotAtCreation = snapshot; } protected KeyValue getNext(Iterator it) { - long readPoint = ReadWriteConsistencyControl.getThreadReadPoint(); + long readPoint = MultiVersionConsistencyControl.getThreadReadPoint(); while (it.hasNext()) { KeyValue v = it.next(); @@ -702,8 +709,8 @@ // kvset and snapshot will never be null. // if tailSet can't find anything, SortedSet is empty (not null). - kvTail = kvset.tailSet(key); - snapshotTail = snapshot.tailSet(key); + kvTail = kvsetAtCreation.tailSet(key); + snapshotTail = snapshotAtCreation.tailSet(key); return seekInSubLists(key); } Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1204112) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -205,7 +205,30 @@ final Path regiondir; KeyValue.KVComparator comparator; + private ConcurrentHashMap scannerReadPoints; + /* + * @return The smallest mvcc readPoint across all the scanners in this + * region. Writes older than this readPoint, are included in every + * read operation. + */ + public long getSmallestReadPoint() { + long minimumReadPoint; + // We need to ensure that while we are calculating the smallestReadPoint + // no new RegionScanners can grab a readPoint that we are unaware of. + // We achieve this by synchronizing on the scannerReadPoints object. + synchronized(scannerReadPoints) { + minimumReadPoint = mvcc.memstoreReadPoint(); + + for (Long readPoint: this.scannerReadPoints.values()) { + if (readPoint < minimumReadPoint) { + minimumReadPoint = readPoint; + } + } + } + return minimumReadPoint; + } + /* * Data structure of write state flags used coordinating flushes, * compactions and closes. */ @@ -261,8 +284,8 @@ private boolean splitRequest; private byte[] explicitSplitPoint = null; - private final ReadWriteConsistencyControl rwcc = - new ReadWriteConsistencyControl(); + private final MultiVersionConsistencyControl mvcc = + new MultiVersionConsistencyControl(); // Coprocessor host private RegionCoprocessorHost coprocessorHost; @@ -291,6 +314,7 @@ this.htableDescriptor = null; this.threadWakeFrequency = 0L; this.coprocessorHost = null; + this.scannerReadPoints = new ConcurrentHashMap(); } /** @@ -334,6 +358,7 @@ String encodedNameStr = this.regionInfo.getEncodedName(); setHTableSpecificConf(); this.regiondir = getRegionDir(this.tableDir, encodedNameStr); + this.scannerReadPoints = new ConcurrentHashMap(); // don't initialize coprocessors if not running within a regionserver // TODO: revisit if coprocessors should load in other cases @@ -415,6 +440,8 @@ // min across all the max. long minSeqId = -1; long maxSeqId = -1; + // initialized to -1 so that we pick up MemstoreTS from column families + long maxMemstoreTS = -1; for (HColumnDescriptor c : this.htableDescriptor.getFamilies()) { status.setStatus("Instantiating store for column family " + c); Store store = instantiateHStore(this.tableDir, c); @@ -426,7 +453,12 @@ if (maxSeqId == -1 || storeSeqId > maxSeqId) { maxSeqId = storeSeqId; } + long maxStoreMemstoreTS = store.getMaxMemstoreTS(); + if (maxStoreMemstoreTS > maxMemstoreTS) { + maxMemstoreTS = maxStoreMemstoreTS; + } } + mvcc.initialize(maxMemstoreTS + 1); // Recover any edits if available. maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny( this.regiondir, minSeqId, reporter, status)); @@ -635,8 +667,8 @@ } } - public ReadWriteConsistencyControl getRWCC() { - return rwcc; + public MultiVersionConsistencyControl getMVCC() { + return mvcc; } /** @@ -1159,6 +1191,7 @@ // during the flush long sequenceId = -1L; long completeSequenceId = -1L; + MultiVersionConsistencyControl.WriteEntry w = null; // We have to take a write lock during snapshot, or else a write could // end up in both snapshot and memstore (makes it difficult to do atomic @@ -1169,6 +1202,10 @@ long currentMemStoreSize = 0; List storeFlushers = new ArrayList(stores.size()); try { + // Record the mvcc for all transactions in progress. + w = mvcc.beginMemstoreInsert(); + mvcc.advanceMemstore(w); + sequenceId = (wal == null)? myseqid: wal.startCacheFlush(this.regionInfo.getEncodedNameAsBytes()); completeSequenceId = this.getCompleteCacheFlushSequenceId(sequenceId); @@ -1184,10 +1221,17 @@ } finally { this.updatesLock.writeLock().unlock(); } - String s = "Finished snapshotting " + this + ", commencing wait for rwcc"; + String s = "Finished snapshotting " + this + ", commencing wait for mvcc"; status.setStatus(s); LOG.debug(s); + // wait for all in-progress transactions to commit to HLog before + // we can start the flush. This prevents + // uncommitted transactions from being written into HFiles. + // We have to block before we start the flush, otherwise keys that + // were removed via a rollbackMemstore could be written to Hfiles. + mvcc.waitForRead(w); + // Any failure from here on out will be catastrophic requiring server // restart so hlog content can be replayed and put back into the memstore. // Otherwise, the snapshot content while backed up in the hlog, it will not @@ -1569,6 +1613,8 @@ this.put(put, lockid, put.getWriteToWAL()); } + + /** * @param put * @param lockid @@ -2098,10 +2144,10 @@ * new entries. */ private long applyFamilyMapToMemstore(Map> familyMap) { - ReadWriteConsistencyControl.WriteEntry w = null; + MultiVersionConsistencyControl.WriteEntry w = null; long size = 0; try { - w = rwcc.beginMemstoreInsert(); + w = mvcc.beginMemstoreInsert(); for (Map.Entry> e : familyMap.entrySet()) { byte[] family = e.getKey(); @@ -2114,7 +2160,7 @@ } } } finally { - rwcc.completeMemstoreInsert(w); + mvcc.completeMemstoreInsert(w); } return size; } @@ -2758,6 +2804,7 @@ } RegionScannerImpl(Scan scan, List additionalScanners) throws IOException { //DebugPrint.println("HRegionScanner."); + this.filter = scan.getFilter(); this.batch = scan.getBatch(); if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) { @@ -2769,7 +2816,12 @@ // it is [startRow,endRow) and if startRow=endRow we get nothing. this.isScan = scan.isGetScan() ? -1 : 0; - this.readPt = ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + // synchronize on scannerReadPoints so that nobody calculates + // getSmallestReadPoint, before scannerReadPoints is updated. + synchronized(scannerReadPoints) { + this.readPt = MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); + scannerReadPoints.put(this, this.readPt); + } List scanners = new ArrayList(); if (additionalScanners != null) { @@ -2779,7 +2831,8 @@ for (Map.Entry> entry : scan.getFamilyMap().entrySet()) { Store store = stores.get(entry.getKey()); - scanners.add(store.getScanner(scan, entry.getValue())); + StoreScanner scanner = store.getScanner(scan, entry.getValue()); + scanners.add(scanner); } this.storeHeap = new KeyValueHeap(scanners, comparator); } @@ -2810,7 +2863,7 @@ try { // This could be a new thread from the last time we called next(). - ReadWriteConsistencyControl.setThreadReadPoint(this.readPt); + MultiVersionConsistencyControl.setThreadReadPoint(this.readPt); results.clear(); @@ -2930,6 +2983,8 @@ storeHeap.close(); storeHeap = null; } + // no need to sychronize here. + scannerReadPoints.remove(this); this.filterClosed = true; } @@ -3633,7 +3688,7 @@ public Result increment(Increment increment, Integer lockid, boolean writeToWAL) throws IOException { - // TODO: Use RWCC to make this set of increments atomic to reads + // TODO: Use MVCC to make this set of increments atomic to reads byte [] row = increment.getRow(); checkRow(row, "increment"); TimeRange tr = increment.getTimeRange(); @@ -3830,7 +3885,7 @@ public static final long FIXED_OVERHEAD = ClassSize.align( ClassSize.OBJECT + ClassSize.ARRAY + - 28 * ClassSize.REFERENCE + Bytes.SIZEOF_INT + + 29 * ClassSize.REFERENCE + Bytes.SIZEOF_INT + (4 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN); @@ -3839,12 +3894,12 @@ (2 * ClassSize.ATOMIC_BOOLEAN) + // closed, closing ClassSize.ATOMIC_LONG + // memStoreSize ClassSize.ATOMIC_INTEGER + // lockIdGenerator - (2 * ClassSize.CONCURRENT_HASHMAP) + // lockedRows, lockIds + (3 * ClassSize.CONCURRENT_HASHMAP) + // lockedRows, lockIds, scannerReadPoints WriteState.HEAP_SIZE + // writestate ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + // stores (2 * ClassSize.REENTRANT_LOCK) + // lock, updatesLock ClassSize.ARRAYLIST + // recentFlushes - ReadWriteConsistencyControl.FIXED_SIZE // rwcc + MultiVersionConsistencyControl.FIXED_SIZE // mvcc ; @Override @@ -3853,7 +3908,7 @@ for(Store store : this.stores.values()) { heapSize += store.heapSize(); } - // this does not take into account row locks, recent flushes, rwcc entries + // this does not take into account row locks, recent flushes, mvcc entries return heapSize; } Index: src/main/java/org/apache/hadoop/hbase/regionserver/Store.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (revision 1204112) +++ src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (working copy) @@ -228,6 +228,13 @@ } /** + * @return The maximum memstoreTS in all store files. + */ + public long getMaxMemstoreTS() { + return StoreFile.getMaxMemstoreTSInList(this.getStorefiles()); + } + + /** * @param tabledir * @param encodedName Encoded region name. * @param family @@ -391,10 +398,15 @@ ArrayList newFiles = new ArrayList(storefiles); newFiles.add(sf); this.storefiles = sortAndClone(newFiles); - notifyChangedReadersObservers(); } finally { + // We need the lock, as long as we are updating the storefiles + // or changing the memstore. Let us release it before calling + // notifyChangeReadersObservers. See HBASE-4485 for a possible + // deadlock scenario that could have happened if continue to hold + // the lock. this.lock.writeLock().unlock(); } + notifyChangedReadersObservers(); LOG.info("Successfully loaded store file " + srcPath + " into store " + this + " (new location: " + dstPath + ")"); } @@ -476,6 +488,8 @@ throws IOException { StoreFile.Writer writer; String fileName; + // Find the smallest read point across all the Scanners. + long smallestReadPoint = region.getSmallestReadPoint(); long flushed = 0; // Don't flush if there are no entries. if (set.size() == 0) { @@ -488,7 +502,7 @@ // pass true as the StoreScanner's retainDeletesInOutput argument. InternalScanner scanner = new StoreScanner(this, scan, Collections.singletonList(new CollectionBackedScanner(set, - this.comparator)), true); + this.comparator)), true, this.region.getSmallestReadPoint()); try { // TODO: We can fail in the below block before we complete adding this // flush to list of store files. Add cleanup of anything put on filesystem @@ -506,6 +520,14 @@ hasMore = scanner.next(kvs); if (!kvs.isEmpty()) { for (KeyValue kv : kvs) { + // If we know that this KV is going to be included always, then let us + // set its memstoreTS to 0. This will help us save space when writing to disk. + if (kv.getMemstoreTS() <= smallestReadPoint) { + // let us not change the original KV. It could be in the memstore + // changing its memstoreTS could affect other threads/scanners. + kv = kv.shallowCopy(); + kv.setMemstoreTS(0); + } writer.append(kv); flushed += this.memstore.heapSizeChange(kv, true); } @@ -587,15 +609,21 @@ ArrayList newList = new ArrayList(storefiles); newList.add(sf); storefiles = sortAndClone(newList); + this.memstore.clearSnapshot(set); - - // Tell listeners of the change in readers. - notifyChangedReadersObservers(); - - return needsCompaction(); } finally { + // We need the lock, as long as we are updating the storefiles + // or changing the memstore. Let us release it before calling + // notifyChangeReadersObservers. See HBASE-4485 for a possible + // deadlock scenario that could have happened if continue to hold + // the lock. this.lock.writeLock().unlock(); } + + // Tell listeners of the change in readers. + notifyChangedReadersObservers(); + + return needsCompaction(); } /* @@ -608,6 +636,34 @@ } } + protected List getScanners(boolean cacheBlocks, + boolean isGet, + boolean isCompaction) throws IOException { + List storeFiles; + List memStoreScanners; + this.lock.readLock().lock(); + try { + storeFiles = this.getStorefiles(); + memStoreScanners = this.memstore.getScanners(); + } finally { + this.lock.readLock().unlock(); + } + + // First the store file scanners + + // TODO this used to get the store files in descending order, + // but now we get them in ascending order, which I think is + // actually more correct, since memstore get put at the end. + List sfScanners = StoreFileScanner + .getScannersForStoreFiles(storeFiles, cacheBlocks, isGet, isCompaction); + List scanners = + new ArrayList(sfScanners.size()+1); + scanners.addAll(sfScanners); + // Then the memstore scanners + scanners.addAll(memStoreScanners); + return scanners; + } + /* * @param o Observer who wants to know about changes in set of Readers */ @@ -1128,18 +1184,21 @@ // For each file, obtain a scanner: List scanners = StoreFileScanner - .getScannersForStoreFiles(filesToCompact, false, false); + .getScannersForStoreFiles(filesToCompact, false, false, true); // Make the instantiation lazy in case compaction produces no product; i.e. // where all source cells are expired or deleted. StoreFile.Writer writer = null; + // Find the smallest read point across all the Scanners. + long smallestReadPoint = region.getSmallestReadPoint(); + MultiVersionConsistencyControl.setThreadReadPoint(smallestReadPoint); try { InternalScanner scanner = null; try { Scan scan = new Scan(); scan.setMaxVersions(family.getMaxVersions()); /* include deletes, unless we are doing a major compaction */ - scanner = new StoreScanner(this, scan, scanners, !majorCompaction); + scanner = new StoreScanner(this, scan, scanners, !majorCompaction, smallestReadPoint); if (region.getCoprocessorHost() != null) { InternalScanner cpScanner = region.getCoprocessorHost().preCompact( this, scanner); @@ -1166,6 +1225,9 @@ if (writer != null) { // output to writer: for (KeyValue kv : kvs) { + if (kv.getMemstoreTS() <= smallestReadPoint) { + kv.setMemstoreTS(0); + } writer.append(kv); // update progress per key ++progress.currentCompactedKVs; @@ -1268,8 +1330,8 @@ this.family.getBloomFilterType()); result.createReader(); } - this.lock.writeLock().lock(); try { + this.lock.writeLock().lock(); try { // Change this.storefiles so it reflects new state but do not // delete old store files until we have sent out notification of @@ -1285,34 +1347,40 @@ } this.storefiles = sortAndClone(newStoreFiles); + } finally { + // We need the lock, as long as we are updating the storefiles + // or changing the memstore. Let us release it before calling + // notifyChangeReadersObservers. See HBASE-4485 for a possible + // deadlock scenario that could have happened if continue to hold + // the lock. + this.lock.writeLock().unlock(); + } - // Tell observers that list of StoreFiles has changed. - notifyChangedReadersObservers(); - // Finally, delete old store files. - for (StoreFile hsf: compactedFiles) { - hsf.deleteReader(); - } - } catch (IOException e) { - e = RemoteExceptionHandler.checkIOException(e); - LOG.error("Failed replacing compacted files in " + this.storeNameStr + - ". Compacted file is " + (result == null? "none": result.toString()) + - ". Files replaced " + compactedFiles.toString() + - " some of which may have been already removed", e); + // Tell observers that list of StoreFiles has changed. + notifyChangedReadersObservers(); + // Finally, delete old store files. + for (StoreFile hsf: compactedFiles) { + hsf.deleteReader(); } - // 4. Compute new store size - this.storeSize = 0L; - this.totalUncompressedBytes = 0L; - for (StoreFile hsf : this.storefiles) { - StoreFile.Reader r = hsf.getReader(); - if (r == null) { - LOG.warn("StoreFile " + hsf + " has a null Reader"); - continue; - } - this.storeSize += r.length(); - this.totalUncompressedBytes += r.getTotalUncompressedBytes(); + } catch (IOException e) { + e = RemoteExceptionHandler.checkIOException(e); + LOG.error("Failed replacing compacted files in " + this.storeNameStr + + ". Compacted file is " + (result == null? "none": result.toString()) + + ". Files replaced " + compactedFiles.toString() + + " some of which may have been already removed", e); + } + + // 4. Compute new store size + this.storeSize = 0L; + this.totalUncompressedBytes = 0L; + for (StoreFile hsf : this.storefiles) { + StoreFile.Reader r = hsf.getReader(); + if (r == null) { + LOG.warn("StoreFile " + hsf + " has a null Reader"); + continue; } - } finally { - this.lock.writeLock().unlock(); + this.storeSize += r.length(); + this.totalUncompressedBytes += r.getTotalUncompressedBytes(); } return result; } @@ -1423,7 +1491,7 @@ firstOnRow = new KeyValue(lastKV.getRow(), HConstants.LATEST_TIMESTAMP); } // Get a scanner that caches blocks and that uses pread. - HFileScanner scanner = r.getHFileReader().getScanner(true, true); + HFileScanner scanner = r.getHFileReader().getScanner(true, true, false); // Seek scanner. If can't seek it, return. if (!seekToScanner(scanner, firstOnRow, firstKV)) return; // If we found candidate on firstOnRow, just return. THIS WILL NEVER HAPPEN! @@ -1614,7 +1682,7 @@ * Return a scanner for both the memstore and the HStore files * @throws IOException */ - public KeyValueScanner getScanner(Scan scan, + public StoreScanner getScanner(Scan scan, final NavigableSet targetCols) throws IOException { lock.readLock().lock(); try { @@ -1771,7 +1839,7 @@ throws IOException { this.lock.readLock().lock(); try { - // TODO: Make this operation atomic w/ RWCC + // TODO: Make this operation atomic w/ MVCC return this.memstore.upsert(kvs); } finally { this.lock.readLock().unlock(); Index: src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (revision 1204112) +++ src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (working copy) @@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.io.hfile.HFileWriterV1; +import org.apache.hadoop.hbase.io.hfile.HFileWriterV2; import org.apache.hadoop.hbase.util.BloomFilter; import org.apache.hadoop.hbase.util.BloomFilterFactory; import org.apache.hadoop.hbase.util.BloomFilterWriter; @@ -141,6 +142,18 @@ // Set when we obtain a Reader. private long sequenceid = -1; + // max of the MemstoreTS in the KV's in this store + // Set when we obtain a Reader. + private long maxMemstoreTS = -1; + + public long getMaxMemstoreTS() { + return maxMemstoreTS; + } + + public void setMaxMemstoreTS(long maxMemstoreTS) { + this.maxMemstoreTS = maxMemstoreTS; + } + // If true, this file was product of a major compaction. Its then set // whenever you get a Reader. private AtomicBoolean majorCompaction = null; @@ -315,6 +328,24 @@ } /** + * Return the largest memstoreTS found across all storefiles in + * the given list. Store files that were created by a mapreduce + * bulk load are ignored, as they do not correspond to any specific + * put operation, and thus do not have a memstoreTS associated with them. + * @return 0 if no non-bulk-load files are provided or, this is Store that + * does not yet have any store files. + */ + public static long getMaxMemstoreTSInList(Collection sfs) { + long max = 0; + for (StoreFile sf : sfs) { + if (!sf.isBulkLoadResult()) { + max = Math.max(max, sf.getMaxMemstoreTS()); + } + } + return max; + } + + /** * Return the highest sequence ID found across all storefiles in * the given list. Store files that were created by a mapreduce * bulk load are ignored, as they do not correspond to any edit @@ -463,6 +494,11 @@ } this.reader.setSequenceID(this.sequenceid); + b = metadataMap.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY); + if (b != null) { + this.maxMemstoreTS = Bytes.toLong(b); + } + b = metadataMap.get(MAJOR_COMPACTION_KEY); if (b != null) { boolean mc = Bytes.toBoolean(b); @@ -994,8 +1030,21 @@ * @return a scanner */ public StoreFileScanner getStoreFileScanner(boolean cacheBlocks, boolean pread) { - return new StoreFileScanner(this, getScanner(cacheBlocks, pread)); + return getStoreFileScanner(cacheBlocks, pread, false); } + /** + * Get a scanner to scan over this StoreFile. + * + * @param cacheBlocks should this scanner cache blocks? + * @param pread use pread (for highly concurrent small readers) + * @param isCompaction is scanner being used for compaction? + * @return a scanner + */ + public StoreFileScanner getStoreFileScanner(boolean cacheBlocks, + boolean pread, boolean isCompaction) { + return new StoreFileScanner(this, getScanner(cacheBlocks, pread, + isCompaction), !isCompaction); + } /** * Warning: Do not write further code which depends on this call. Instead @@ -1008,9 +1057,28 @@ */ @Deprecated public HFileScanner getScanner(boolean cacheBlocks, boolean pread) { - return reader.getScanner(cacheBlocks, pread); + return getScanner(cacheBlocks, pread, false); } + /** + * Warning: Do not write further code which depends on this call. Instead + * use getStoreFileScanner() which uses the StoreFileScanner class/interface + * which is the preferred way to scan a store with higher level concepts. + * + * @param cacheBlocks + * should we cache the blocks? + * @param pread + * use pread (for concurrent small readers) + * @param isCompaction + * is scanner being used for compaction? + * @return the underlying HFileScanner + */ + @Deprecated + public HFileScanner getScanner(boolean cacheBlocks, boolean pread, + boolean isCompaction) { + return reader.getScanner(cacheBlocks, pread, isCompaction); + } + public void close(boolean evictOnClose) throws IOException { reader.close(evictOnClose); } Index: src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java (revision 1204112) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java (working copy) @@ -373,13 +373,13 @@ * Implementation of {@link HFileScanner} interface. */ protected static class ScannerV1 extends AbstractHFileReader.Scanner { - private final HFileReaderV1 readerV1; + private final HFileReaderV1 reader; private int currBlock; public ScannerV1(HFileReaderV1 reader, boolean cacheBlocks, final boolean pread, final boolean isCompaction) { - super(reader, cacheBlocks, pread, isCompaction); - readerV1 = reader; + super(cacheBlocks, pread, isCompaction); + this.reader = reader; } @Override @@ -446,7 +446,7 @@ blockBuffer = null; return false; } - blockBuffer = readerV1.readBlockBuffer(currBlock, cacheBlocks, pread, + blockBuffer = reader.readBlockBuffer(currBlock, cacheBlocks, pread, isCompaction); currKeyLen = blockBuffer.getInt(); currValueLen = blockBuffer.getInt(); @@ -466,7 +466,7 @@ @Override public int seekTo(byte[] key, int offset, int length) throws IOException { - int b = readerV1.blockContainingKey(key, offset, length); + int b = reader.blockContainingKey(key, offset, length); if (b < 0) return -1; // falls before the beginning of the file! :-( // Avoid re-reading the same block (that'd be dumb). loadBlock(b, true); @@ -492,7 +492,7 @@ } } - int b = readerV1.blockContainingKey(key, offset, length); + int b = reader.blockContainingKey(key, offset, length); if (b < 0) { return -1; } @@ -559,7 +559,7 @@ @Override public boolean seekBefore(byte[] key, int offset, int length) throws IOException { - int b = readerV1.blockContainingKey(key, offset, length); + int b = reader.blockContainingKey(key, offset, length); if (b < 0) return false; // key is before the start of the file. @@ -611,7 +611,7 @@ return true; } currBlock = 0; - blockBuffer = readerV1.readBlockBuffer(currBlock, cacheBlocks, pread, + blockBuffer = reader.readBlockBuffer(currBlock, cacheBlocks, pread, isCompaction); currKeyLen = blockBuffer.getInt(); currValueLen = blockBuffer.getInt(); @@ -621,13 +621,13 @@ private void loadBlock(int bloc, boolean rewind) throws IOException { if (blockBuffer == null) { - blockBuffer = readerV1.readBlockBuffer(bloc, cacheBlocks, pread, + blockBuffer = reader.readBlockBuffer(bloc, cacheBlocks, pread, isCompaction); currBlock = bloc; blockFetches++; } else { if (bloc != currBlock) { - blockBuffer = readerV1.readBlockBuffer(bloc, cacheBlocks, pread, + blockBuffer = reader.readBlockBuffer(bloc, cacheBlocks, pread, isCompaction); currBlock = bloc; blockFetches++; Index: src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java (revision 1204112) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java (working copy) @@ -19,7 +19,9 @@ */ package org.apache.hadoop.hbase.io.hfile; +import java.io.ByteArrayInputStream; import java.io.DataInput; +import java.io.DataInputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -33,6 +35,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.IdLock; +import org.apache.hadoop.io.WritableUtils; /** * {@link HFile} reader for version 2. @@ -45,8 +48,14 @@ * The size of a (key length, value length) tuple that prefixes each entry in * a data block. */ - private static final int KEY_VALUE_LEN_SIZE = 2 * Bytes.SIZEOF_INT; + private static int KEY_VALUE_LEN_SIZE = 2 * Bytes.SIZEOF_INT; + private boolean includesMemstoreTS = false; + + private boolean shouldIncludeMemstoreTS() { + return includesMemstoreTS; + } + /** * A "sparse lock" implementation allowing to lock on a particular block * identified by offset. The purpose of this is to avoid two clients loading @@ -114,6 +123,9 @@ lastKey = fileInfo.get(FileInfo.LASTKEY); avgKeyLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_KEY_LEN)); avgValueLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_VALUE_LEN)); + byte [] keyValueFormatVersion = fileInfo.get(HFileWriterV2.KEY_VALUE_VERSION); + includesMemstoreTS = (keyValueFormatVersion != null && + Bytes.toInt(keyValueFormatVersion) == HFileWriterV2.KEY_VALUE_VER_WITH_MEMSTORE); // Store all other load-on-open blocks for further consumption. HFileBlock b; @@ -315,19 +327,30 @@ */ protected static class ScannerV2 extends AbstractHFileReader.Scanner { private HFileBlock block; + private HFileReaderV2 reader; public ScannerV2(HFileReaderV2 r, boolean cacheBlocks, final boolean pread, final boolean isCompaction) { - super(r, cacheBlocks, pread, isCompaction); + super(cacheBlocks, pread, isCompaction); + this.reader = r; } @Override + public HFileReaderV2 getReader() { + return reader; + } + + @Override public KeyValue getKeyValue() { if (!isSeeked()) return null; - return new KeyValue(blockBuffer.array(), blockBuffer.arrayOffset() + KeyValue ret = new KeyValue(blockBuffer.array(), blockBuffer.arrayOffset() + blockBuffer.position()); + if (this.reader.shouldIncludeMemstoreTS()) { + ret.setMemstoreTS(currMemstoreTS); + } + return ret; } @Override @@ -353,6 +376,8 @@ blockBuffer = null; currKeyLen = 0; currValueLen = 0; + currMemstoreTS = 0; + currMemstoreTSLen = 0; } /** @@ -368,7 +393,7 @@ try { blockBuffer.position(blockBuffer.position() + KEY_VALUE_LEN_SIZE - + currKeyLen + currValueLen); + + currKeyLen + currValueLen + currMemstoreTSLen); } catch (IllegalArgumentException e) { LOG.error("Current pos = " + blockBuffer.position() + "; currKeyLen = " + currKeyLen + "; currValLen = " @@ -561,6 +586,16 @@ currKeyLen = blockBuffer.getInt(); currValueLen = blockBuffer.getInt(); blockBuffer.reset(); + if (this.reader.shouldIncludeMemstoreTS()) { + try { + int memstoreTSOffset = blockBuffer.arrayOffset() + blockBuffer.position() + + KEY_VALUE_LEN_SIZE + currKeyLen + currValueLen; + currMemstoreTS = Bytes.readVLong(blockBuffer.array(), memstoreTSOffset); + currMemstoreTSLen = WritableUtils.getVIntSize(currMemstoreTS); + } catch (Exception e) { + throw new RuntimeException("Error reading memstoreTS. " + e); + } + } if (currKeyLen < 0 || currValueLen < 0 || currKeyLen > blockBuffer.limit() @@ -588,12 +623,24 @@ private int blockSeek(byte[] key, int offset, int length, boolean seekBefore) { int klen, vlen; + long memstoreTS = 0; + int memstoreTSLen = 0; int lastKeyValueSize = -1; do { blockBuffer.mark(); klen = blockBuffer.getInt(); vlen = blockBuffer.getInt(); blockBuffer.reset(); + if (this.reader.shouldIncludeMemstoreTS()) { + try { + int memstoreTSOffset = blockBuffer.arrayOffset() + blockBuffer.position() + + KEY_VALUE_LEN_SIZE + klen + vlen; + memstoreTS = Bytes.readVLong(blockBuffer.array(), memstoreTSOffset); + memstoreTSLen = WritableUtils.getVIntSize(memstoreTS); + } catch (Exception e) { + throw new RuntimeException("Error reading memstoreTS. " + e); + } + } int keyOffset = blockBuffer.arrayOffset() + blockBuffer.position() + KEY_VALUE_LEN_SIZE; @@ -615,6 +662,10 @@ } currKeyLen = klen; currValueLen = vlen; + if (this.reader.shouldIncludeMemstoreTS()) { + currMemstoreTS = memstoreTS; + currMemstoreTSLen = memstoreTSLen; + } return 0; // indicate exact match } @@ -626,7 +677,7 @@ } // The size of this key/value tuple, including key/value length fields. - lastKeyValueSize = klen + vlen + KEY_VALUE_LEN_SIZE; + lastKeyValueSize = klen + vlen + memstoreTSLen + KEY_VALUE_LEN_SIZE; blockBuffer.position(blockBuffer.position() + lastKeyValueSize); } while (blockBuffer.remaining() > 0); Index: src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java (revision 1204112) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java (working copy) @@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.util.BloomFilterWriter; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; /** * Writes HFile format version 2. @@ -46,6 +47,13 @@ public class HFileWriterV2 extends AbstractHFileWriter { static final Log LOG = LogFactory.getLog(HFileWriterV2.class); + /** Max memstore (mvcc) timestamp in FileInfo */ + public static final byte [] MAX_MEMSTORE_TS_KEY = Bytes.toBytes("MAX_MEMSTORE_TS_KEY"); + /** KeyValue version in FileInfo */ + public static final byte [] KEY_VALUE_VERSION = Bytes.toBytes("KEY_VALUE_VERSION"); + /** Version for KeyValue which includes memstore timestamp */ + public static final int KEY_VALUE_VER_WITH_MEMSTORE = 1; + /** Inline block writers for multi-level block index and compound Blooms. */ private List inlineBlockWriters = new ArrayList(); @@ -66,6 +74,9 @@ private List additionalLoadOnOpenData = new ArrayList(); + private final boolean includeMemstoreTS = true; + private long maxMemstoreTS = 0; + static class WriterFactoryV2 extends HFile.WriterFactory { WriterFactoryV2(Configuration conf, CacheConfig cacheConf) { @@ -298,8 +309,9 @@ */ @Override public void append(final KeyValue kv) throws IOException { - append(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(), + append(kv.getMemstoreTS(), kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(), kv.getBuffer(), kv.getValueOffset(), kv.getValueLength()); + this.maxMemstoreTS = Math.max(this.maxMemstoreTS, kv.getMemstoreTS()); } /** @@ -314,7 +326,7 @@ */ @Override public void append(final byte[] key, final byte[] value) throws IOException { - append(key, 0, key.length, value, 0, value.length); + append(0, key, 0, key.length, value, 0, value.length); } /** @@ -329,7 +341,7 @@ * @param vlength * @throws IOException */ - private void append(final byte[] key, final int koffset, final int klength, + private void append(final long memstoreTS, final byte[] key, final int koffset, final int klength, final byte[] value, final int voffset, final int vlength) throws IOException { boolean dupKey = checkKey(key, koffset, klength); @@ -342,6 +354,7 @@ newBlock(); // Write length of key and value and then actual key and value bytes. + // Additionally, we may also write down the memstoreTS. { DataOutputStream out = fsBlockWriter.getUserDataStream(); out.writeInt(klength); @@ -350,6 +363,9 @@ totalValueLength += vlength; out.write(key, koffset, klength); out.write(value, voffset, vlength); + if (this.includeMemstoreTS) { + WritableUtils.writeVLong(out, memstoreTS); + } } // Are we the first key in this block? @@ -415,6 +431,11 @@ fsBlockWriter.writeHeaderAndData(outputStream); totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader(); + if (this.includeMemstoreTS) { + appendFileInfo(MAX_MEMSTORE_TS_KEY, Bytes.toBytes(maxMemstoreTS)); + appendFileInfo(KEY_VALUE_VERSION, Bytes.toBytes(KEY_VALUE_VER_WITH_MEMSTORE)); + } + // File info writeFileInfo(trailer, fsBlockWriter.startWriting(BlockType.FILE_INFO, false)); @@ -436,6 +457,7 @@ trailer.setComparatorClass(comparator.getClass()); trailer.setDataIndexCount(dataBlockIndexWriter.getNumRootEntries()); + finishClose(trailer); fsBlockWriter.releaseCompressor(); Index: src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java (revision 1204112) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java (working copy) @@ -290,7 +290,6 @@ } protected static abstract class Scanner implements HFileScanner { - protected HFile.Reader reader; protected ByteBuffer blockBuffer; protected boolean cacheBlocks; @@ -299,30 +298,26 @@ protected int currKeyLen; protected int currValueLen; + protected int currMemstoreTSLen; + protected long currMemstoreTS; protected int blockFetches; - public Scanner(final HFile.Reader reader, final boolean cacheBlocks, + public Scanner(final boolean cacheBlocks, final boolean pread, final boolean isCompaction) { - this.reader = reader; this.cacheBlocks = cacheBlocks; this.pread = pread; this.isCompaction = isCompaction; } @Override - public Reader getReader() { - return reader; - } - - @Override public boolean isSeeked(){ return blockBuffer != null; } @Override public String toString() { - return "HFileScanner for reader " + String.valueOf(reader); + return "HFileScanner for reader " + String.valueOf(getReader()); } protected void assertSeeked() {