Index: src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java (revision 1186348) +++ src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java (working copy) @@ -198,7 +198,7 @@ protected HTableDescriptor createTableDescriptor(final String name, final int versions) { return createTableDescriptor(name, HColumnDescriptor.DEFAULT_MIN_VERSIONS, - versions, HConstants.FOREVER); + versions, HConstants.FOREVER, HColumnDescriptor.DEFAULT_KEEP_DELETED); } /** @@ -209,21 +209,21 @@ * @return Column descriptor. */ protected HTableDescriptor createTableDescriptor(final String name, - final int minVersions, final int versions, final int ttl) { + final int minVersions, final int versions, final int ttl, boolean keepDeleted) { HTableDescriptor htd = new HTableDescriptor(name); htd.addFamily(new HColumnDescriptor(fam1, minVersions, versions, - HColumnDescriptor.DEFAULT_COMPRESSION, false, false, - HColumnDescriptor.DEFAULT_BLOCKSIZE, ttl, - HColumnDescriptor.DEFAULT_BLOOMFILTER, - HConstants.REPLICATION_SCOPE_LOCAL)); + keepDeleted, HColumnDescriptor.DEFAULT_COMPRESSION, false, false, + HColumnDescriptor.DEFAULT_BLOCKSIZE, ttl, + HColumnDescriptor.DEFAULT_BLOOMFILTER, + HConstants.REPLICATION_SCOPE_LOCAL)); htd.addFamily(new HColumnDescriptor(fam2, minVersions, versions, - HColumnDescriptor.DEFAULT_COMPRESSION, false, false, + keepDeleted, HColumnDescriptor.DEFAULT_COMPRESSION, false, false, HColumnDescriptor.DEFAULT_BLOCKSIZE, ttl, HColumnDescriptor.DEFAULT_BLOOMFILTER, HConstants.REPLICATION_SCOPE_LOCAL)); htd.addFamily(new HColumnDescriptor(fam3, minVersions, versions, - HColumnDescriptor.DEFAULT_COMPRESSION, false, false, - HColumnDescriptor.DEFAULT_BLOCKSIZE, ttl, + keepDeleted, HColumnDescriptor.DEFAULT_COMPRESSION, false, false, + HColumnDescriptor.DEFAULT_BLOCKSIZE, ttl, HColumnDescriptor.DEFAULT_BLOOMFILTER, HConstants.REPLICATION_SCOPE_LOCAL)); return htd; Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java (revision 1186348) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java (working copy) @@ -24,16 +24,13 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueTestUtil; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.regionserver.Store.ScanInfo; +import org.apache.hadoop.hbase.regionserver.StoreScanner.ScanType; import org.apache.hadoop.hbase.util.Bytes; -import org.mockito.Mockito; -import org.mockito.stubbing.OngoingStubbing; -import com.google.common.collect.Lists; - import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.List; import java.util.NavigableSet; import java.util.TreeSet; @@ -42,6 +39,9 @@ public class TestStoreScanner extends TestCase { private static final String CF_STR = "cf"; final byte [] CF = Bytes.toBytes(CF_STR); + private ScanInfo scanInfo = new ScanInfo(CF, 0, Integer.MAX_VALUE, Long.MAX_VALUE, false, + KeyValue.COMPARATOR); + private ScanType scanType = ScanType.USER_SCAN; /* * Test utility for building a NavigableSet for scanners. @@ -74,9 +74,8 @@ Scan scanSpec = new Scan(Bytes.toBytes(r1)); scanSpec.setTimeRange(0, 6); scanSpec.setMaxVersions(); - StoreScanner scan = - new StoreScanner(scanSpec, CF, Long.MAX_VALUE, - KeyValue.COMPARATOR, getCols("a"), scanners); + StoreScanner scan = new StoreScanner(scanSpec, scanInfo, scanType, + getCols("a"), scanners); List results = new ArrayList(); assertEquals(true, scan.next(results)); assertEquals(5, results.size()); @@ -85,8 +84,8 @@ scanSpec = new Scan(Bytes.toBytes(r1)); scanSpec.setTimeRange(1, 3); scanSpec.setMaxVersions(); - scan = new StoreScanner(scanSpec, CF, Long.MAX_VALUE, - KeyValue.COMPARATOR, getCols("a"), scanners); + scan = new StoreScanner(scanSpec, scanInfo, scanType, getCols("a"), + scanners); results = new ArrayList(); assertEquals(true, scan.next(results)); assertEquals(2, results.size()); @@ -94,8 +93,8 @@ scanSpec = new Scan(Bytes.toBytes(r1)); scanSpec.setTimeRange(5, 10); scanSpec.setMaxVersions(); - scan = new StoreScanner(scanSpec, CF, Long.MAX_VALUE, - KeyValue.COMPARATOR, getCols("a"), scanners); + scan = new StoreScanner(scanSpec, scanInfo, scanType, getCols("a"), + scanners); results = new ArrayList(); assertEquals(true, scan.next(results)); assertEquals(1, results.size()); @@ -104,8 +103,8 @@ scanSpec = new Scan(Bytes.toBytes(r1)); scanSpec.setTimeRange(0, 10); scanSpec.setMaxVersions(3); - scan = new StoreScanner(scanSpec, CF, Long.MAX_VALUE, - KeyValue.COMPARATOR, getCols("a"), scanners); + scan = new StoreScanner(scanSpec, scanInfo, scanType, getCols("a"), + scanners); results = new ArrayList(); assertEquals(true, scan.next(results)); assertEquals(3, results.size()); @@ -124,10 +123,8 @@ Scan scanSpec = new Scan(Bytes.toBytes("R1")); // this only uses maxVersions (default=1) and TimeRange (default=all) - StoreScanner scan = - new StoreScanner(scanSpec, CF, Long.MAX_VALUE, - KeyValue.COMPARATOR, getCols("a"), - scanners); + StoreScanner scan = new StoreScanner(scanSpec, scanInfo, scanType, + getCols("a"), scanners); List results = new ArrayList(); assertEquals(true, scan.next(results)); @@ -153,10 +150,8 @@ Scan scanSpec = new Scan(Bytes.toBytes("R1")); // this only uses maxVersions (default=1) and TimeRange (default=all) - StoreScanner scan = - new StoreScanner(scanSpec, CF, Long.MAX_VALUE, - KeyValue.COMPARATOR, getCols("a"), - scanners); + StoreScanner scan = new StoreScanner(scanSpec, scanInfo, scanType, + getCols("a"), scanners); List results = new ArrayList(); scan.next(results); @@ -183,9 +178,8 @@ }; List scanners = scanFixture(kvs); Scan scanSpec = new Scan(Bytes.toBytes("R1")); - StoreScanner scan = - new StoreScanner(scanSpec, CF, Long.MAX_VALUE, KeyValue.COMPARATOR, - getCols("a"), scanners); + StoreScanner scan = new StoreScanner(scanSpec, scanInfo, scanType, + getCols("a"), scanners); List results = new ArrayList(); assertFalse(scan.next(results)); @@ -204,9 +198,8 @@ }; List scanners = scanFixture(kvs); Scan scanSpec = new Scan(Bytes.toBytes("R1")); - StoreScanner scan = - new StoreScanner(scanSpec, CF, Long.MAX_VALUE, KeyValue.COMPARATOR, - getCols("a"), scanners); + StoreScanner scan = new StoreScanner(scanSpec, scanInfo, scanType, + getCols("a"), scanners); List results = new ArrayList(); assertEquals(true, scan.next(results)); @@ -232,9 +225,8 @@ }; List scanners = scanFixture(kvs1, kvs2); - StoreScanner scan = - new StoreScanner(new Scan(Bytes.toBytes("R1")), CF, Long.MAX_VALUE, KeyValue.COMPARATOR, - getCols("a"), scanners); + StoreScanner scan = new StoreScanner(new Scan(Bytes.toBytes("R1")), + scanInfo, scanType, getCols("a"), scanners); List results = new ArrayList(); // the two put at ts=now will be masked by the 1 delete, and // since the scan default returns 1 version we'll return the newest @@ -258,9 +250,8 @@ List scanners = scanFixture(kvs1, kvs2); Scan scanSpec = new Scan(Bytes.toBytes("R1")).setMaxVersions(2); - StoreScanner scan = - new StoreScanner(scanSpec, CF, Long.MAX_VALUE, KeyValue.COMPARATOR, - getCols("a"), scanners); + StoreScanner scan = new StoreScanner(scanSpec, scanInfo, scanType, + getCols("a"), scanners); List results = new ArrayList(); assertEquals(true, scan.next(results)); assertEquals(2, results.size()); @@ -275,9 +266,8 @@ KeyValueTestUtil.create("R1", "cf", "a", 1, KeyValue.Type.DeleteColumn, "dont-care"), }; List scanners = scanFixture(kvs); - StoreScanner scan = - new StoreScanner(new Scan(Bytes.toBytes("R1")), CF, Long.MAX_VALUE, KeyValue.COMPARATOR, - null, scanners); + StoreScanner scan = new StoreScanner(new Scan(Bytes.toBytes("R1")), + scanInfo, scanType, null, scanners); List results = new ArrayList(); assertEquals(true, scan.next(results)); assertEquals(2, results.size()); @@ -305,9 +295,8 @@ }; List scanners = scanFixture(kvs); - StoreScanner scan = - new StoreScanner(new Scan().setMaxVersions(2), CF, Long.MAX_VALUE, KeyValue.COMPARATOR, - null, scanners); + StoreScanner scan = new StoreScanner(new Scan().setMaxVersions(2), + scanInfo, scanType, null, scanners); List results = new ArrayList(); assertEquals(true, scan.next(results)); assertEquals(5, results.size()); @@ -334,9 +323,9 @@ KeyValueTestUtil.create("R2", "cf", "a", 11, KeyValue.Type.Put, "dont-care"), }; List scanners = scanFixture(kvs); - StoreScanner scan = - new StoreScanner(new Scan().setMaxVersions(Integer.MAX_VALUE), CF, Long.MAX_VALUE, KeyValue.COMPARATOR, - null, scanners); + StoreScanner scan = new StoreScanner( + new Scan().setMaxVersions(Integer.MAX_VALUE), scanInfo, scanType, null, + scanners); List results = new ArrayList(); assertEquals(true, scan.next(results)); assertEquals(0, results.size()); @@ -355,9 +344,8 @@ KeyValueTestUtil.create("R1", "cf", "b", 5, KeyValue.Type.Put, "dont-care") }; List scanners = scanFixture(kvs); - StoreScanner scan = - new StoreScanner(new Scan(), CF, Long.MAX_VALUE, KeyValue.COMPARATOR, - null, scanners); + StoreScanner scan = new StoreScanner(new Scan(), scanInfo, scanType, null, + scanners); List results = new ArrayList(); assertEquals(true, scan.next(results)); assertEquals(1, results.size()); @@ -379,9 +367,8 @@ public void testSkipColumn() throws IOException { List scanners = scanFixture(kvs); - StoreScanner scan = - new StoreScanner(new Scan(), CF, Long.MAX_VALUE, KeyValue.COMPARATOR, - getCols("a", "d"), scanners); + StoreScanner scan = new StoreScanner(new Scan(), scanInfo, scanType, + getCols("a", "d"), scanners); List results = new ArrayList(); assertEquals(true, scan.next(results)); @@ -417,8 +404,11 @@ List scanners = scanFixture(kvs); Scan scan = new Scan(); scan.setMaxVersions(1); + ScanInfo scanInfo = new ScanInfo(CF, 0, 1, 500, false, + KeyValue.COMPARATOR); + ScanType scanType = ScanType.USER_SCAN; StoreScanner scanner = - new StoreScanner(scan, CF, 500, KeyValue.COMPARATOR, + new StoreScanner(scan, scanInfo, scanType, null, scanners); List results = new ArrayList(); @@ -440,11 +430,9 @@ public void testScannerReseekDoesntNPE() throws Exception { List scanners = scanFixture(kvs); - StoreScanner scan = - new StoreScanner(new Scan(), CF, Long.MAX_VALUE, KeyValue.COMPARATOR, - getCols("a", "d"), scanners); + StoreScanner scan = new StoreScanner(new Scan(), scanInfo, scanType, + getCols("a", "d"), scanners); - // Previously a updateReaders twice in a row would cause an NPE. In test this would also // normally cause an NPE because scan.store is null. So as long as we get through these // two calls we are good and the bug was quashed. @@ -467,9 +455,8 @@ }; List scanners = scanFixture(kvs); Scan scanSpec = new Scan(Bytes.toBytes("R1")); - StoreScanner scan = - new StoreScanner(scanSpec, CF, Long.MAX_VALUE, KeyValue.COMPARATOR, - getCols("a"), scanners); + StoreScanner scan = new StoreScanner(scanSpec, scanInfo, scanType, + getCols("a"), scanners); assertNull(scan.peek()); } } Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java (revision 1186348) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java (working copy) @@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.HBaseTestCase; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.KVComparator; import org.apache.hadoop.hbase.KeyValue.KeyComparator; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Scan; @@ -51,7 +52,7 @@ private Get get; long ttl = Long.MAX_VALUE; - KeyComparator rowComparator; + KVComparator rowComparator; private Scan scan; public void setUp() throws Exception { @@ -76,7 +77,7 @@ get.addColumn(fam2, col5); this.scan = new Scan(get); - rowComparator = KeyValue.KEY_COMPARATOR; + rowComparator = KeyValue.COMPARATOR; } @@ -95,8 +96,9 @@ expected.add(ScanQueryMatcher.MatchCode.DONE); // 2,4,5 - ScanQueryMatcher qm = new ScanQueryMatcher(scan, fam2, - get.getFamilyMap().get(fam2), ttl, rowComparator, 1); + + ScanQueryMatcher qm = new ScanQueryMatcher(scan, new Store.ScanInfo(fam2, + 0, 1, ttl, false, rowComparator), get.getFamilyMap().get(fam2)); List memstore = new ArrayList(); memstore.add(new KeyValue(row1, fam2, col1, 1, data)); @@ -139,7 +141,8 @@ expected.add(ScanQueryMatcher.MatchCode.INCLUDE); expected.add(ScanQueryMatcher.MatchCode.DONE); - ScanQueryMatcher qm = new ScanQueryMatcher(scan, fam2, null, ttl, rowComparator, 1); + ScanQueryMatcher qm = new ScanQueryMatcher(scan, new Store.ScanInfo(fam2, + 0, 1, ttl, false, rowComparator), null); List memstore = new ArrayList(); memstore.add(new KeyValue(row1, fam2, col1, 1, data)); @@ -189,8 +192,8 @@ ScanQueryMatcher.MatchCode.DONE }; - ScanQueryMatcher qm = new ScanQueryMatcher(scan, fam2, - get.getFamilyMap().get(fam2), testTTL, rowComparator, 1); + ScanQueryMatcher qm = new ScanQueryMatcher(scan, new Store.ScanInfo(fam2, + 0, 1, testTTL, false, rowComparator), get.getFamilyMap().get(fam2)); long now = System.currentTimeMillis(); KeyValue [] kvs = new KeyValue[] { @@ -241,8 +244,8 @@ ScanQueryMatcher.MatchCode.DONE }; - ScanQueryMatcher qm = new ScanQueryMatcher(scan, fam2, - null, testTTL, rowComparator, 1); + ScanQueryMatcher qm = new ScanQueryMatcher(scan, new Store.ScanInfo(fam2, + 0, 1, testTTL, false, rowComparator), null); long now = System.currentTimeMillis(); KeyValue [] kvs = new KeyValue[] { Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java (revision 1186348) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java (working copy) @@ -27,6 +27,7 @@ import java.util.Arrays; import org.apache.hadoop.hbase.HBaseTestCase; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode; import org.apache.hadoop.hbase.util.Bytes; @@ -54,7 +55,8 @@ long timestamp = 0; //"Match" for(byte [] col : scannerColumns){ - result.add(exp.checkColumn(col, 0, col.length, ++timestamp)); + result.add(exp.checkColumn(col, 0, col.length, ++timestamp, + KeyValue.Type.Put.getCode())); } assertEquals(expected.size(), result.size()); @@ -166,13 +168,13 @@ Long.MAX_VALUE); for (int i = 0; i < 100000; i+=2) { byte [] col = Bytes.toBytes("col"+i); - explicit.checkColumn(col, 0, col.length, 1); + explicit.checkColumn(col, 0, col.length, 1, KeyValue.Type.Put.getCode()); } explicit.update(); for (int i = 1; i < 100000; i+=2) { byte [] col = Bytes.toBytes("col"+i); - explicit.checkColumn(col, 0, col.length, 1); + explicit.checkColumn(col, 0, col.length, 1, KeyValue.Type.Put.getCode()); } } Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestMinVersions.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestMinVersions.java (revision 1186348) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestMinVersions.java (working copy) @@ -49,7 +49,7 @@ * Verify behavior of getClosestBefore(...) */ public void testGetClosestBefore() throws Exception { - HTableDescriptor htd = createTableDescriptor(getName(), 1, 1000, 1); + HTableDescriptor htd = createTableDescriptor(getName(), 1, 1000, 1, false); HRegion region = createNewHRegion(htd, null, null); long ts = System.currentTimeMillis() - 2000; // 2s in the past @@ -92,7 +92,7 @@ */ public void testStoreMemStore() throws Exception { // keep 3 versions minimum - HTableDescriptor htd = createTableDescriptor(getName(), 3, 1000, 1); + HTableDescriptor htd = createTableDescriptor(getName(), 3, 1000, 1, false); HRegion region = createNewHRegion(htd, null, null); long ts = System.currentTimeMillis() - 2000; // 2s in the past @@ -141,7 +141,7 @@ * Make sure the Deletes behave as expected with minimum versions */ public void testDelete() throws Exception { - HTableDescriptor htd = createTableDescriptor(getName(), 3, 1000, 1); + HTableDescriptor htd = createTableDescriptor(getName(), 3, 1000, 1, false); HRegion region = createNewHRegion(htd, null, null); long ts = System.currentTimeMillis() - 2000; // 2s in the past @@ -193,7 +193,7 @@ * Make sure the memstor behaves correctly with minimum versions */ public void testMemStore() throws Exception { - HTableDescriptor htd = createTableDescriptor(getName(), 2, 1000, 1); + HTableDescriptor htd = createTableDescriptor(getName(), 2, 1000, 1, false); HRegion region = createNewHRegion(htd, null, null); long ts = System.currentTimeMillis() - 2000; // 2s in the past @@ -262,7 +262,7 @@ */ public void testBaseCase() throws Exception { // 1 version minimum, 1000 versions maximum, ttl = 1s - HTableDescriptor htd = createTableDescriptor(getName(), 2, 1000, 1); + HTableDescriptor htd = createTableDescriptor(getName(), 2, 1000, 1, false); HRegion region = createNewHRegion(htd, null, null); long ts = System.currentTimeMillis() - 2000; // 2s in the past @@ -347,7 +347,7 @@ * minimum versions enabled. */ public void testFilters() throws Exception { - HTableDescriptor htd = createTableDescriptor(getName(), 2, 1000, 1); + HTableDescriptor htd = createTableDescriptor(getName(), 2, 1000, 1, false); HRegion region = createNewHRegion(htd, null, null); final byte [] c1 = COLUMNS[1]; @@ -408,7 +408,7 @@ g.setMaxVersions(); r = region.get(g, null); checkResult(r, c0, T2); -} + } private void checkResult(Result r, byte[] col, byte[] ... vals) { assertEquals(r.size(), vals.length); Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java (revision 1186348) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java (working copy) @@ -25,6 +25,7 @@ import java.util.List; import org.apache.hadoop.hbase.HBaseTestCase; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode; import org.apache.hadoop.hbase.util.Bytes; @@ -54,7 +55,7 @@ for(byte [] qualifier : qualifiers) { ScanQueryMatcher.MatchCode mc = tracker.checkColumn(qualifier, 0, - qualifier.length, 1); + qualifier.length, 1, KeyValue.Type.Put.getCode()); actual.add(mc); } @@ -87,7 +88,7 @@ long timestamp = 0; for(byte [] qualifier : qualifiers) { MatchCode mc = tracker.checkColumn(qualifier, 0, qualifier.length, - ++timestamp); + ++timestamp, KeyValue.Type.Put.getCode()); actual.add(mc); } @@ -110,7 +111,8 @@ try { for(byte [] qualifier : qualifiers) { - tracker.checkColumn(qualifier, 0, qualifier.length, 1); + tracker.checkColumn(qualifier, 0, qualifier.length, 1, + KeyValue.Type.Put.getCode()); } } catch (Exception e) { ok = true; Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestKeepDeletes.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestKeepDeletes.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestKeepDeletes.java (revision 0) @@ -0,0 +1,725 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseTestCase; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; + +public class TestKeepDeletes extends HBaseTestCase { + private final byte[] T0 = Bytes.toBytes("0"); + private final byte[] T1 = Bytes.toBytes("1"); + private final byte[] T2 = Bytes.toBytes("2"); + private final byte[] T3 = Bytes.toBytes("3"); + private final byte[] T4 = Bytes.toBytes("4"); + private final byte[] T5 = Bytes.toBytes("5"); + private final byte[] T6 = Bytes.toBytes("6"); + + private final byte[] c0 = COLUMNS[0]; + private final byte[] c1 = COLUMNS[1]; + + /** + * Make sure that deleted rows are retained. + * Family delete markers are deleted. + * Column Delete markers are versioned + * Time range scan of deleted rows are possible + */ + public void testBasicScenario() throws Exception { + // keep 3 versions, rows do not expire + HTableDescriptor htd = createTableDescriptor(getName(), 0, 3, + HConstants.FOREVER, true); + HRegion region = createNewHRegion(htd, null, null); + + long ts = System.currentTimeMillis(); + Put p = new Put(T1, ts); + p.add(c0, c0, T1); + region.put(p); + p = new Put(T1, ts+1); + p.add(c0, c0, T2); + region.put(p); + p = new Put(T1, ts+2); + p.add(c0, c0, T3); + region.put(p); + p = new Put(T1, ts+4); + p.add(c0, c0, T4); + region.put(p); + + // now place a delete marker at ts+2 + Delete d = new Delete(T1, ts+2, null); + region.delete(d, null, true); + + // a raw scan can see the delete markers + // (one for each column family) + assertEquals(3, countDeleteMarkers(region)); + + // get something *before* the delete marker + Get g = new Get(T1); + g.setMaxVersions(); + g.setTimeRange(0L, ts+2); + Result r = region.get(g, null); + checkResult(r, c0, c0, T2,T1); + + // flush + region.flushcache(); + + // yep, T2 still there, T1 gone + r = region.get(g, null); + checkResult(r, c0, c0, T2); + + // major compact + region.compactStores(true); + region.compactStores(true); + + // one delete marker left (the others did not + // have older puts) + assertEquals(1, countDeleteMarkers(region)); + + // still there (even after multiple compactions) + r = region.get(g, null); + checkResult(r, c0, c0, T2); + + // a timerange that includes the delete marker won't see past rows + g.setTimeRange(0L, ts+4); + r = region.get(g, null); + assertTrue(r.isEmpty()); + + // two more puts, this will expire the older puts. + p = new Put(T1, ts+5); + p.add(c0, c0, T5); + region.put(p); + p = new Put(T1, ts+6); + p.add(c0, c0, T6); + region.put(p); + + // also add an old put again + // (which is past the max versions) + p = new Put(T1, ts); + p.add(c0, c0, T1); + region.put(p); + r = region.get(g, null); + assertTrue(r.isEmpty()); + + region.flushcache(); + region.compactStores(true); + region.compactStores(true); + + // verify that the delete marker itself was collected + region.put(p); + r = region.get(g, null); + checkResult(r, c0, c0, T1); + assertEquals(0, countDeleteMarkers(region)); + } + + /** + * Even when the store does not keep deletes a "raw" scan will + * return everything it can find (unless discarding cells is guaranteed + * to have no effect). + * Assuming this the desired behavior. Could also disallow "raw" scanning + * if the store does not have KEEP_DELETED_CELLS enabled. + * (can be changed easily) + */ + public void testRawScanWithoutKeepingDeletes() throws Exception { + // KEEP_DELETED_CELLS is NOT enabled + HTableDescriptor htd = createTableDescriptor(getName(), 0, 3, + HConstants.FOREVER, false); + HRegion region = createNewHRegion(htd, null, null); + + long ts = System.currentTimeMillis(); + Put p = new Put(T1, ts); + p.add(c0, c0, T1); + region.put(p); + + Delete d = new Delete(T1, ts, null); + d.deleteColumn(c0, c0, ts); + region.delete(d, null, true); + + // scan still returns delete markers and deletes rows + Scan s = new Scan(); + s.setRaw(true); + s.setMaxVersions(); + InternalScanner scan = region.getScanner(s); + List kvs = new ArrayList(); + scan.next(kvs); + assertEquals(2, kvs.size()); + + region.flushcache(); + region.compactStores(true); + + // after compaction they are gone + // (note that this a test with a Store without + // KEEP_DELETED_CELLS) + s = new Scan(); + s.setRaw(true); + s.setMaxVersions(); + scan = region.getScanner(s); + kvs = new ArrayList(); + scan.next(kvs); + assertTrue(kvs.isEmpty()); + } + + /** + * basic verification of existing behavior + */ + public void testWithoutKeepingDeletes() throws Exception { + // KEEP_DELETED_CELLS is NOT enabled + HTableDescriptor htd = createTableDescriptor(getName(), 0, 3, + HConstants.FOREVER, false); + HRegion region = createNewHRegion(htd, null, null); + + long ts = System.currentTimeMillis(); + Put p = new Put(T1, ts); + p.add(c0, c0, T1); + region.put(p); + Delete d = new Delete(T1, ts+2, null); + d.deleteColumn(c0, c0, ts); + region.delete(d, null, true); + + // "past" get does not see rows behind delete marker + Get g = new Get(T1); + g.setMaxVersions(); + g.setTimeRange(0L, ts+1); + Result r = region.get(g, null); + assertTrue(r.isEmpty()); + + // "past" scan does not see rows behind delete marker + Scan s = new Scan(); + s.setMaxVersions(); + s.setTimeRange(0L, ts+1); + InternalScanner scanner = region.getScanner(s); + List kvs = new ArrayList(); + while(scanner.next(kvs)); + assertTrue(kvs.isEmpty()); + + // flushing and minor compaction keep delete markers + region.flushcache(); + region.compactStores(); + assertEquals(1, countDeleteMarkers(region)); + region.compactStores(true); + // major compaction deleted it + assertEquals(0, countDeleteMarkers(region)); + } + + /** + * The ExplicitColumnTracker does not support "raw" scanning. + */ + public void testRawScanWithColumns() throws Exception { + HTableDescriptor htd = createTableDescriptor(getName(), 0, 3, + HConstants.FOREVER, true); + HRegion region = createNewHRegion(htd, null, null); + + Scan s = new Scan(); + s.setRaw(true); + s.setMaxVersions(); + s.addColumn(c0, c0); + + try { + InternalScanner scan = region.getScanner(s); + fail("raw scanner with columns should have failed"); + } catch (DoNotRetryIOException dnre) { + // ok! + } + } + + /** + * Verify that "raw" scanning mode return delete markers and deletes rows. + */ + public void testRawScan() throws Exception { + HTableDescriptor htd = createTableDescriptor(getName(), 0, 3, + HConstants.FOREVER, true); + HRegion region = createNewHRegion(htd, null, null); + + long ts = System.currentTimeMillis(); + Put p = new Put(T1, ts); + p.add(c0, c0, T1); + region.put(p); + p = new Put(T1, ts+2); + p.add(c0, c0, T2); + region.put(p); + p = new Put(T1, ts+4); + p.add(c0, c0, T3); + region.put(p); + + Delete d = new Delete(T1, ts+1, null); + region.delete(d, null, true); + + d = new Delete(T1, ts+2, null); + d.deleteColumn(c0, c0, ts+2); + region.delete(d, null, true); + + d = new Delete(T1, ts+3, null); + d.deleteColumns(c0, c0, ts+3); + region.delete(d, null, true); + + Scan s = new Scan(); + s.setRaw(true); + s.setMaxVersions(); + InternalScanner scan = region.getScanner(s); + List kvs = new ArrayList(); + scan.next(kvs); + assertTrue(kvs.get(0).isDeleteFamily()); + assertEquals(kvs.get(1).getValue(), T3); + assertTrue(kvs.get(2).isDelete()); + assertTrue(kvs.get(3).isDeleteType()); + assertEquals(kvs.get(4).getValue(), T2); + assertEquals(kvs.get(5).getValue(), T1); + } + + /** + * Verify that delete markers are removed from an otherwise empty store. + */ + public void testDeleteMarkerExpirationEmptyStore() throws Exception { + HTableDescriptor htd = createTableDescriptor(getName(), 0, 1, + HConstants.FOREVER, true); + HRegion region = createNewHRegion(htd, null, null); + + long ts = System.currentTimeMillis(); + + Delete d = new Delete(T1, ts, null); + d.deleteColumns(c0, c0, ts); + region.delete(d, null, true); + + d = new Delete(T1, ts, null); + d.deleteFamily(c0); + region.delete(d, null, true); + + d = new Delete(T1, ts, null); + d.deleteColumn(c0, c0, ts+1); + region.delete(d, null, true); + + d = new Delete(T1, ts, null); + d.deleteColumn(c0, c0, ts+2); + region.delete(d, null, true); + + // 1 family marker, 1 column marker, 2 version markers + assertEquals(4, countDeleteMarkers(region)); + + // neither flush nor minor compaction removes any marker + region.flushcache(); + assertEquals(4, countDeleteMarkers(region)); + region.compactStores(false); + assertEquals(4, countDeleteMarkers(region)); + + // major compaction removes all, since there are no puts they affect + region.compactStores(true); + assertEquals(0, countDeleteMarkers(region)); + } + + /** + * Test delete marker removal from store files. + */ + public void testDeleteMarkerExpiration() throws Exception { + HTableDescriptor htd = createTableDescriptor(getName(), 0, 1, + HConstants.FOREVER, true); + HRegion region = createNewHRegion(htd, null, null); + + long ts = System.currentTimeMillis(); + + Put p = new Put(T1, ts); + p.add(c0, c0, T1); + region.put(p); + + // a put into another store (CF) should have no effect + p = new Put(T1, ts-10); + p.add(c1, c0, T1); + region.put(p); + + // all the following deletes affect the put + Delete d = new Delete(T1, ts, null); + d.deleteColumns(c0, c0, ts); + region.delete(d, null, true); + + d = new Delete(T1, ts, null); + d.deleteFamily(c0, ts); + region.delete(d, null, true); + + d = new Delete(T1, ts, null); + d.deleteColumn(c0, c0, ts+1); + region.delete(d, null, true); + + d = new Delete(T1, ts, null); + d.deleteColumn(c0, c0, ts+2); + region.delete(d, null, true); + + // 1 family marker, 1 column marker, 2 version markers + assertEquals(4, countDeleteMarkers(region)); + + region.flushcache(); + assertEquals(4, countDeleteMarkers(region)); + region.compactStores(false); + assertEquals(4, countDeleteMarkers(region)); + + // another put will push out the earlier put... + p = new Put(T1, ts+3); + p.add(c0, c0, T1); + region.put(p); + + region.flushcache(); + // no markers are collected, since there is an affected put + region.compactStores(true); + assertEquals(4, countDeleteMarkers(region)); + + // the last collections collected the earlier put + // so after this collection all markers + region.compactStores(true); + assertEquals(0, countDeleteMarkers(region)); + } + + /** + * Verify correct range demarcation + */ + public void testRanges() throws Exception { + HTableDescriptor htd = createTableDescriptor(getName(), 0, 3, + HConstants.FOREVER, true); + HRegion region = createNewHRegion(htd, null, null); + + long ts = System.currentTimeMillis(); + Put p = new Put(T1, ts); + p.add(c0, c0, T1); + p.add(c0, c1, T1); + p.add(c1, c0, T1); + p.add(c1, c1, T1); + region.put(p); + + p = new Put(T2, ts); + p.add(c0, c0, T1); + p.add(c0, c1, T1); + p.add(c1, c0, T1); + p.add(c1, c1, T1); + region.put(p); + + p = new Put(T1, ts+1); + p.add(c0, c0, T2); + p.add(c0, c1, T2); + p.add(c1, c0, T2); + p.add(c1, c1, T2); + region.put(p); + + p = new Put(T2, ts+1); + p.add(c0, c0, T2); + p.add(c0, c1, T2); + p.add(c1, c0, T2); + p.add(c1, c1, T2); + region.put(p); + + Delete d = new Delete(T1, ts+1, null); + d.deleteColumns(c0, c0, ts+1); + region.delete(d, null, true); + + d = new Delete(T1, ts+1, null); + d.deleteFamily(c1, ts+1); + region.delete(d, null, true); + + d = new Delete(T2, ts+1, null); + d.deleteFamily(c0, ts+1); + region.delete(d, null, true); + + // add an older delete, to make sure it is filtered + d = new Delete(T1, ts-10, null); + d.deleteFamily(c1, ts-10); + region.delete(d, null, true); + + // ts + 2 does NOT include the delete at ts+1 + checkGet(region, T1, c0, c0, ts+2, T2, T1); + checkGet(region, T1, c0, c1, ts+2, T2, T1); + checkGet(region, T1, c1, c0, ts+2, T2, T1); + checkGet(region, T1, c1, c1, ts+2, T2, T1); + + checkGet(region, T2, c0, c0, ts+2, T2, T1); + checkGet(region, T2, c0, c1, ts+2, T2, T1); + checkGet(region, T2, c1, c0, ts+2, T2, T1); + checkGet(region, T2, c1, c1, ts+2, T2, T1); + + // ts + 3 does + checkGet(region, T1, c0, c0, ts+3); + checkGet(region, T1, c0, c1, ts+3, T2, T1); + checkGet(region, T1, c1, c0, ts+3); + checkGet(region, T1, c1, c1, ts+3); + + checkGet(region, T2, c0, c0, ts+3); + checkGet(region, T2, c0, c1, ts+3); + checkGet(region, T2, c1, c0, ts+3, T2, T1); + checkGet(region, T2, c1, c1, ts+3, T2, T1); + } + + /** + * Verify that column/version delete makers are sorted + * with their respective puts and removed correctly by + * versioning (i.e. not relying on the store earliestPutTS). + */ + public void testDeleteMarkerVersioning() throws Exception { + HTableDescriptor htd = createTableDescriptor(getName(), 0, 1, + HConstants.FOREVER, true); + HRegion region = createNewHRegion(htd, null, null); + + long ts = System.currentTimeMillis(); + Put p = new Put(T1, ts); + p.add(c0, c0, T1); + region.put(p); + + // this prevents marker collection based on earliestPut + // (cannot keep earliest put per column in the store file) + p = new Put(T1, ts-10); + p.add(c0, c1, T1); + region.put(p); + + Delete d = new Delete(T1, ts, null); + // test corner case (Put and Delete have same TS) + d.deleteColumns(c0, c0, ts); + region.delete(d, null, true); + + d = new Delete(T1, ts+1, null); + d.deleteColumn(c0, c0, ts+1); + region.delete(d, null, true); + + d = new Delete(T1, ts+3, null); + d.deleteColumn(c0, c0, ts+3); + region.delete(d, null, true); + + region.flushcache(); + region.compactStores(true); + region.compactStores(true); + assertEquals(3, countDeleteMarkers(region)); + + // add two more puts, since max version is 1 + // the 2nd put (and all delete markers following) + // will be removed. + p = new Put(T1, ts+2); + p.add(c0, c0, T2); + region.put(p); + + // delete, put, delete, delete, put + assertEquals(3, countDeleteMarkers(region)); + + p = new Put(T1, ts+3); + p.add(c0, c0, T3); + region.put(p); + + // This is potentially questionable behavior. + // This could be changed by not letting the ScanQueryMatcher + // return SEEK_NEXT_COL if a put is past VERSIONS, but instead + // return SKIP if the store has KEEP_DELETED_CELLS set. + // + // As it stands, the 1 here is correct here. + // There are two puts, VERSIONS is one, so after the 1st put the scanner + // knows that there can be no more KVs (put or delete) that have any effect. + // + // delete, put, put | delete, delete + assertEquals(1, countDeleteMarkers(region)); + + // flush cache only sees what is in the memstore + region.flushcache(); + + // Here we have the three markers again, because the flush above + // removed the 2nd put before the file is written. + // So there's only one put, and hence the deletes already in the store + // files cannot be removed safely. + // delete, put, delete, delete + assertEquals(3, countDeleteMarkers(region)); + + region.compactStores(true); + assertEquals(3, countDeleteMarkers(region)); + + // add one more put + p = new Put(T1, ts+4); + p.add(c0, c0, T4); + region.put(p); + + region.flushcache(); + // one trailing delete marker remains (but only one) + // because delete markers do not increase the version count + assertEquals(1, countDeleteMarkers(region)); + region.compactStores(true); + region.compactStores(true); + assertEquals(1, countDeleteMarkers(region)); + } + + /** + * Verify scenarios with multiple CFs and columns + */ + public void testWithMixedCFs() throws Exception { + HTableDescriptor htd = createTableDescriptor(getName(), 0, 1, + HConstants.FOREVER, true); + HRegion region = createNewHRegion(htd, null, null); + + long ts = System.currentTimeMillis(); + + Put p = new Put(T1, ts); + p.add(c0, c0, T1); + p.add(c0, c1, T1); + p.add(c1, c0, T1); + p.add(c1, c1, T1); + region.put(p); + + p = new Put(T2, ts+1); + p.add(c0, c0, T2); + p.add(c0, c1, T2); + p.add(c1, c0, T2); + p.add(c1, c1, T2); + region.put(p); + + // family markers are each family + Delete d = new Delete(T1, ts, null); + region.delete(d, null, true); + + d = new Delete(T2, ts+1, null); + region.delete(d, null, true); + + Scan s = new Scan(T1); + s.setTimeRange(0, ts+1); + InternalScanner scanner = region.getScanner(s); + List kvs = new ArrayList(); + scanner.next(kvs); + assertEquals(4, kvs.size()); + scanner.close(); + + s = new Scan(T2); + s.setTimeRange(0, ts+2); + scanner = region.getScanner(s); + kvs = new ArrayList(); + scanner.next(kvs); + assertEquals(4, kvs.size()); + scanner.close(); + } + + /** + * Test keeping deleted rows together with min versions set + * @throws Exception + */ + public void testWithMinVersions() throws Exception { + HTableDescriptor htd = createTableDescriptor(getName(), 3, 1000, 1, true); + HRegion region = createNewHRegion(htd, null, null); + + long ts = System.currentTimeMillis() - 2000; // 2s in the past + + Put p = new Put(T1, ts); + p.add(c0, c0, T3); + region.put(p); + p = new Put(T1, ts-1); + p.add(c0, c0, T2); + region.put(p); + p = new Put(T1, ts-3); + p.add(c0, c0, T1); + region.put(p); + p = new Put(T1, ts-4); + p.add(c0, c0, T0); + region.put(p); + + // all puts now are just retained because of min versions = 3 + + // place a family delete marker + Delete d = new Delete(T1, ts-1, null); + region.delete(d, null, true); + // and a column delete marker + d = new Delete(T1, ts-2, null); + d.deleteColumns(c0, c0, ts-1); + region.delete(d, null, true); + + Get g = new Get(T1); + g.setMaxVersions(); + g.setTimeRange(0L, ts-2); + Result r = region.get(g, null); + checkResult(r, c0, c0, T1,T0); + + // 3 families, one column delete marker + assertEquals(4, countDeleteMarkers(region)); + + region.flushcache(); + // no delete marker removes by the flush + assertEquals(4, countDeleteMarkers(region)); + + r = region.get(g, null); + checkResult(r, c0, c0, T1); + p = new Put(T1, ts+1); + p.add(c0, c0, T4); + region.put(p); + region.flushcache(); + + assertEquals(4, countDeleteMarkers(region)); + + r = region.get(g, null); + checkResult(r, c0, c0, T1); + + // this will push out the last put before + // family delete marker + p = new Put(T1, ts+2); + p.add(c0, c0, T5); + region.put(p); + + region.flushcache(); + region.compactStores(true); + // the two family markers without puts are gone + assertEquals(2, countDeleteMarkers(region)); + + // the last compactStores updated the earliestPutTs, + // so after the next compaction the last family delete marker is also gone + region.compactStores(true); + assertEquals(0, countDeleteMarkers(region)); + } + + private void checkGet(HRegion region, byte[] row, byte[] fam, byte[] col, + long time, byte[]... vals) throws IOException { + Get g = new Get(row); + g.addColumn(fam, col); + g.setMaxVersions(); + g.setTimeRange(0L, time); + Result r = region.get(g, null); + checkResult(r, fam, col, vals); + + } + + private int countDeleteMarkers(HRegion region) throws IOException { + Scan s = new Scan(); + s.setRaw(true); + s.setMaxVersions(); + InternalScanner scan = region.getScanner(s); + List kvs = new ArrayList(); + int res = 0; + boolean hasMore; + do { + hasMore = scan.next(kvs); + for (KeyValue kv : kvs) { + if(kv.isDelete()) res++; + } + kvs.clear(); + } while (hasMore); + scan.close(); + return res; + } + + private void checkResult(Result r, byte[] fam, byte[] col, byte[] ... vals) { + assertEquals(r.size(), vals.length); + List kvs = r.getColumn(fam, col); + assertEquals(kvs.size(), vals.length); + for (int i=0;i result = new ArrayList(); ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); - StoreScanner s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP, - this.memstore.comparator, null, memstorescanners); + ScanInfo scanInfo = new ScanInfo(null, 0, 1, HConstants.LATEST_TIMESTAMP, false, + this.memstore.comparator); + ScanType scanType = ScanType.USER_SCAN; + StoreScanner s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners); int count = 0; try { while (s.next(result)) { @@ -111,8 +112,7 @@ ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); memstorescanners = this.memstore.getScanners(); // Now assert can count same number even if a snapshot mid-scan. - s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP, - this.memstore.comparator, null, memstorescanners); + s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners); count = 0; try { while (s.next(result)) { @@ -138,8 +138,7 @@ memstorescanners = this.memstore.getScanners(); // Assert that new values are seen in kvset as we scan. long ts = System.currentTimeMillis(); - s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP, - this.memstore.comparator, null, memstorescanners); + s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners); count = 0; int snapshotIndex = 5; try { @@ -553,10 +552,12 @@ } //starting from each row, validate results should contain the starting row for (int startRowId = 0; startRowId < ROW_COUNT; startRowId++) { - InternalScanner scanner = - new StoreScanner(new Scan(Bytes.toBytes(startRowId)), FAMILY, - Integer.MAX_VALUE, this.memstore.comparator, null, - memstore.getScanners()); + ScanInfo scanInfo = new ScanInfo(FAMILY, 0, 1, Integer.MAX_VALUE, false, + this.memstore.comparator); + ScanType scanType = ScanType.USER_SCAN; + InternalScanner scanner = new StoreScanner(new Scan( + Bytes.toBytes(startRowId)), scanInfo, scanType, null, + memstore.getScanners()); List results = new ArrayList(); for (int i = 0; scanner.next(results); i++) { int rowId = startRowId + i; Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java (revision 1186348) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java (working copy) @@ -24,13 +24,8 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; import java.util.List; -import java.util.Map; -import java.util.Set; -import static org.junit.Assert.fail; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -240,11 +235,15 @@ // Multiple versions allowed for an entry, so the delete isn't enough // Lower TTL and expire to ensure that all our entries have been wiped - final int ttlInSeconds = 1; + final int ttl = 1000; for (Store store: this.r.stores.values()) { - store.ttl = ttlInSeconds * 1000; + Store.ScanInfo old = store.scanInfo; + Store.ScanInfo si = new Store.ScanInfo(old.getFamily(), + old.getMinVersions(), old.getMaxVersions(), ttl, + old.getKeepDeletedCells(), old.getComparator()); + store.scanInfo = si; } - Thread.sleep(ttlInSeconds * 1000); + Thread.sleep(1000); r.compactStores(true); int count = count(); @@ -446,11 +445,15 @@ // Multiple versions allowed for an entry, so the delete isn't enough // Lower TTL and expire to ensure that all our entries have been wiped - final int ttlInSeconds = 1; + final int ttl = 1000; for (Store store: this.r.stores.values()) { - store.ttl = ttlInSeconds * 1000; + Store.ScanInfo old = store.scanInfo; + Store.ScanInfo si = new Store.ScanInfo(old.getFamily(), + old.getMinVersions(), old.getMaxVersions(), ttl, + old.getKeepDeletedCells(), old.getComparator()); + store.scanInfo = si; } - Thread.sleep(ttlInSeconds * 1000); + Thread.sleep(ttl); r.compactStores(true); assertEquals(0, count()); Index: src/main/java/org/apache/hadoop/hbase/client/Attributes.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/Attributes.java (revision 1186348) +++ src/main/java/org/apache/hadoop/hbase/client/Attributes.java (working copy) @@ -26,6 +26,7 @@ /** * Sets an attribute. * In case value = null attribute is removed from the attributes map. + * Attribute names starting with _ indicate system attributes. * @param name attribute name * @param value attribute value */ Index: src/main/java/org/apache/hadoop/hbase/client/Scan.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/Scan.java (revision 1186348) +++ src/main/java/org/apache/hadoop/hbase/client/Scan.java (working copy) @@ -82,12 +82,13 @@ * execute {@link #setCacheBlocks(boolean)}. */ public class Scan extends OperationWithAttributes implements Writable { + private static final String RAW_ATTR = "_raw_"; + private static final byte SCAN_VERSION = (byte)2; private byte [] startRow = HConstants.EMPTY_START_ROW; private byte [] stopRow = HConstants.EMPTY_END_ROW; private int maxVersions = 1; private int batch = -1; - // If application wants to collect scan metrics, it needs to // call scan.setAttribute(SCAN_ATTRIBUTES_ENABLE, Bytes.toBytes(Boolean.TRUE)) static public String SCAN_ATTRIBUTES_METRICS_ENABLE = @@ -695,4 +696,26 @@ } return cols.toString(); } + + /** + * Enable/disable "raw" mode for this scan. + * If "raw" is enabled the scan will return all + * delete marker and deleted rows that have not + * been collected, yet. + * This is mostly useful for Scan on column families + * that have KEEP_DELETED_ROWS enabled. + * It is an error to specify any column when "raw" is set. + * @param raw True/False to enable/disable "raw" mode. + */ + public void setRaw(boolean raw) { + setAttribute(RAW_ATTR, Bytes.toBytes(raw)); + } + + /** + * @return True if this Scan is in "raw" mode. + */ + public boolean isRaw() { + byte[] attr = getAttribute(RAW_ATTR); + return attr == null ? false : Bytes.toBoolean(attr); + } } Index: src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java (revision 1186348) +++ src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java (working copy) @@ -88,6 +88,7 @@ public static final String FOREVER = "FOREVER"; public static final String REPLICATION_SCOPE = "REPLICATION_SCOPE"; public static final String MIN_VERSIONS = "MIN_VERSIONS"; + public static final String KEEP_DELETED_CELLS = "KEEP_DELETED_CELLS"; /** * Default compression type. @@ -117,6 +118,11 @@ public static final boolean DEFAULT_IN_MEMORY = false; /** + * Default setting for preventing deleted from being collected immediately. + */ + public static final boolean DEFAULT_KEEP_DELETED = false; + + /** * Default setting for whether to use a block cache or not. */ public static final boolean DEFAULT_BLOCKCACHE = true; @@ -151,6 +157,7 @@ DEFAULT_VALUES.put(BLOCKSIZE, String.valueOf(DEFAULT_BLOCKSIZE)); DEFAULT_VALUES.put(HConstants.IN_MEMORY, String.valueOf(DEFAULT_IN_MEMORY)); DEFAULT_VALUES.put(BLOCKCACHE, String.valueOf(DEFAULT_BLOCKCACHE)); + DEFAULT_VALUES.put(KEEP_DELETED_CELLS, String.valueOf(DEFAULT_KEEP_DELETED)); } // Column family name @@ -265,8 +272,9 @@ final String compression, final boolean inMemory, final boolean blockCacheEnabled, final int blocksize, final int timeToLive, final String bloomFilter, final int scope) { - this(familyName, DEFAULT_MIN_VERSIONS, maxVersions, compression, inMemory, - blockCacheEnabled, blocksize, timeToLive, bloomFilter, scope); + this(familyName, DEFAULT_MIN_VERSIONS, maxVersions, DEFAULT_KEEP_DELETED, + compression, inMemory, blockCacheEnabled, blocksize, timeToLive, + bloomFilter, scope); } /** @@ -292,8 +300,9 @@ * a : * @throws IllegalArgumentException if the number of versions is <= 0 */ - public HColumnDescriptor(final byte [] familyName, final int minVersions, - final int maxVersions, final String compression, final boolean inMemory, + public HColumnDescriptor(final byte[] familyName, final int minVersions, + final int maxVersions, final boolean keepDeletedRows, + final String compression, final boolean inMemory, final boolean blockCacheEnabled, final int blocksize, final int timeToLive, final String bloomFilter, final int scope) { isLegalFamilyName(familyName); @@ -309,14 +318,15 @@ if (timeToLive == HConstants.FOREVER) { throw new IllegalArgumentException("Minimum versions requires TTL."); } - if (minVersions > maxVersions) { - throw new IllegalArgumentException("Minimum versions must be <= "+ - "maximum versions."); + if (minVersions >= maxVersions) { + throw new IllegalArgumentException("Minimum versions must be < " + + "maximum versions."); } } setMaxVersions(maxVersions); setMinVersions(minVersions); + setKeepDeletedRows(keepDeletedRows); setInMemory(inMemory); setBlockCacheEnabled(blockCacheEnabled); setTimeToLive(timeToLive); @@ -542,7 +552,23 @@ setValue(HConstants.IN_MEMORY, Boolean.toString(inMemory)); } + public boolean getKeepDeletedCells() { + String value = getValue(KEEP_DELETED_CELLS); + if (value != null) { + return Boolean.valueOf(value).booleanValue(); + } + return DEFAULT_KEEP_DELETED; + } + /** + * @param keepDeletedRows True if deleted rows should not be collected + * immediately. + */ + public void setKeepDeletedRows(boolean keepDeletedRows) { + setValue(KEEP_DELETED_CELLS, Boolean.toString(keepDeletedRows)); + } + + /** * @return Time-to-live of cell contents, in seconds. */ public int getTimeToLive() { Index: src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (revision 1186348) +++ src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (working copy) @@ -20,6 +20,9 @@ package org.apache.hadoop.hbase.regionserver; +import java.io.IOException; +import java.util.NavigableSet; + import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Scan; @@ -29,8 +32,7 @@ import org.apache.hadoop.hbase.regionserver.DeleteTracker.DeleteResult; import org.apache.hadoop.hbase.util.Bytes; -import java.io.IOException; -import java.util.NavigableSet; +import org.apache.hadoop.hbase.regionserver.StoreScanner.ScanType; /** * A query matcher that is specifically designed for the scan case. @@ -39,73 +41,105 @@ // Optimization so we can skip lots of compares when we decide to skip // to the next row. private boolean stickyNextRow; - private byte[] stopRow; + private final byte[] stopRow; - protected TimeRange tr; + private final TimeRange tr; - protected Filter filter; + private final Filter filter; /** Keeps track of deletes */ - protected DeleteTracker deletes; - protected boolean retainDeletesInOutput; + private final DeleteTracker deletes; + /* + * The following three booleans define how we deal with deletes. + * There are three different aspects: + * 1. Whether to keep delete markers. This is used in compactions. + * Minor compactions always keep delete markers. + * 2. Whether to keep deleted rows. This is also used in compactions, + * if the store is set to keep deleted rows. This implies keeping + * the delete markers as well. + * In this case deleted rows are subject to the normal max version + * and TTL/min version rules just like "normal" rows. + * 3. Whether a scan can do time travel queries even before deleted + * marker to reach deleted rows. + */ + /** whether to retain delete markers */ + private final boolean retainDeletesInOutput; + /** whether to return deleted rows */ + private final boolean keepDeletedCells; + /** whether time range queries can see rows "behind" a delete */ + private final boolean seePastDeleteMarkers; + + /** Keeps track of columns and versions */ - protected ColumnTracker columns; + private final ColumnTracker columns; /** Key to seek to in memstore and StoreFiles */ - protected KeyValue startKey; + private final KeyValue startKey; /** Row comparator for the region this query is for */ - KeyValue.KeyComparator rowComparator; + private final KeyValue.KeyComparator rowComparator; + /* row is not private for tests */ /** Row the query is on */ - protected byte [] row; + byte [] row; + + /** + * Oldest put in any of the involved store files + * Used to decide whether it is ok to delete + * family delete marker of this store keeps + * deleted KVs. + */ + private final long earliestPutTs; /** - * Constructs a ScanQueryMatcher for a Scan. + * Construct a QueryMatcher for a scan * @param scan - * @param family + * @param scanInfo The store's immutable scan info * @param columns - * @param ttl - * @param rowComparator + * @param scanType Type of the scan + * @param earliestPutTs Earliest put seen in any of the store files. */ - public ScanQueryMatcher(Scan scan, byte [] family, - NavigableSet columns, long ttl, - KeyValue.KeyComparator rowComparator, int minVersions, int maxVersions, - boolean retainDeletesInOutput) { + public ScanQueryMatcher(Scan scan, Store.ScanInfo scanInfo, + NavigableSet columns, StoreScanner.ScanType scanType, + long earliestPutTs) { this.tr = scan.getTimeRange(); - this.rowComparator = rowComparator; + this.rowComparator = scanInfo.getComparator().getRawComparator(); this.deletes = new ScanDeleteTracker(); this.stopRow = scan.getStopRow(); - this.startKey = KeyValue.createFirstOnRow(scan.getStartRow(), family, null); + this.startKey = KeyValue.createFirstOnRow(scan.getStartRow(), scanInfo.getFamily(), null); this.filter = scan.getFilter(); - this.retainDeletesInOutput = retainDeletesInOutput; + this.earliestPutTs = earliestPutTs; + /* how to deal with deletes */ + // keep deleted cells: if compaction or raw scan + this.keepDeletedCells = (scanInfo.getKeepDeletedCells() && scanType != ScanType.USER_SCAN) || scan.isRaw(); + // retain deletes: if minor compaction or raw scan + this.retainDeletesInOutput = scanType == ScanType.MINOR_COMPACT || scan.isRaw(); + // seePastDeleteMarker: user initiated scans + this.seePastDeleteMarkers = scanInfo.getKeepDeletedCells() && scanType == ScanType.USER_SCAN; + + int maxVersions = Math.min(scan.getMaxVersions(), scanInfo.getMaxVersions()); // Single branch to deal with two types of reads (columns vs all in family) if (columns == null || columns.size() == 0) { // use a specialized scan for wildcard column tracker. - this.columns = new ScanWildcardColumnTracker(minVersions, maxVersions, ttl); + this.columns = new ScanWildcardColumnTracker(scanInfo.getMinVersions(), maxVersions, scanInfo.getTtl()); } else { // We can share the ExplicitColumnTracker, diff is we reset // between rows, not between storefiles. - this.columns = new ExplicitColumnTracker(columns, minVersions, maxVersions, - ttl); + this.columns = new ExplicitColumnTracker(columns, scanInfo.getMinVersions(), maxVersions, + scanInfo.getTtl()); } } - public ScanQueryMatcher(Scan scan, byte [] family, - NavigableSet columns, long ttl, - KeyValue.KeyComparator rowComparator, int minVersions, int maxVersions) { - /* By default we will not include deletes */ - /* deletes are included explicitly (for minor compaction) */ - this(scan, family, columns, ttl, rowComparator, minVersions, maxVersions, - false); + /* + * Constructor for tests + */ + ScanQueryMatcher(Scan scan, Store.ScanInfo scanInfo, + NavigableSet columns) { + this(scan, scanInfo, columns, StoreScanner.ScanType.USER_SCAN, + HConstants.LATEST_TIMESTAMP); } - public ScanQueryMatcher(Scan scan, byte [] family, - NavigableSet columns, long ttl, - KeyValue.KeyComparator rowComparator, int maxVersions) { - this(scan, family, columns, ttl, rowComparator, 0, maxVersions); - } /** * Determines if the caller should do one of several things: @@ -171,21 +205,50 @@ return columns.getNextRowOrNextColumn(bytes, offset, qualLength); } + /* + * The delete logic is pretty complicated now. + * This is corroborated by the following: + * 1. The store might be instructed to keep deleted rows around. + * 2. A scan can optionally see past a delete marker now. + * 3. If deleted rows are kept, we have to find out when we can + * remove the delete markers. + * 4. Family delete markers are always first (regardless of their TS) + * 5. Delete markers should not be counted as version + * 6. Delete markers affect puts of the *same* TS + * 7. Delete marker need to be version counted together with puts + * they affect + */ byte type = kv.getType(); - if (isDelete(type)) { - if (tr.withinOrAfterTimeRange(timestamp)) { - this.deletes.add(bytes, offset, qualLength, timestamp, type); + if (kv.isDelete()) { + if (!keepDeletedCells) { + // first ignore delete markers if the scanner can do so, and the + // range does not include the marker + boolean includeDeleteMarker = seePastDeleteMarkers ? + // +1, to allow a range between a delete and put of same TS + tr.withinTimeRange(timestamp+1) : + tr.withinOrAfterTimeRange(timestamp); + if (includeDeleteMarker) { + this.deletes.add(bytes, offset, qualLength, timestamp, type); + } // Can't early out now, because DelFam come before any other keys } if (retainDeletesInOutput) { + // always include return MatchCode.INCLUDE; - } - else { + } else if (keepDeletedCells) { + if (timestamp < earliestPutTs) { + // keeping delete rows, but there are no puts older than + // this delete in the store files. + return columns.getNextRowOrNextColumn(bytes, offset, qualLength); + } + // else: fall through and do version counting on the + // delete markers + } else { return MatchCode.SKIP; } - } - - if (!this.deletes.isEmpty()) { + // note the following next else if... + // delete marker are not subject to other delete markers + } else if (!this.deletes.isEmpty()) { DeleteResult deleteResult = deletes.isDeleted(bytes, offset, qualLength, timestamp); switch (deleteResult) { @@ -228,7 +291,8 @@ } } - MatchCode colChecker = columns.checkColumn(bytes, offset, qualLength, timestamp); + MatchCode colChecker = columns.checkColumn(bytes, offset, qualLength, + timestamp, type); /* * According to current implementation, colChecker can only be * SEEK_NEXT_COL, SEEK_NEXT_ROW, SKIP or INCLUDE. Therefore, always return @@ -269,11 +333,6 @@ stickyNextRow = false; } - // should be in KeyValue. - protected boolean isDelete(byte type) { - return (type != KeyValue.Type.Put.getCode()); - } - /** * * @return the start key Index: src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (revision 1186348) +++ src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (working copy) @@ -20,9 +20,10 @@ package org.apache.hadoop.hbase.regionserver; -import org.apache.commons.lang.NotImplementedException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Scan; @@ -86,12 +87,14 @@ * @throws IOException */ StoreScanner(Store store, Scan scan, final NavigableSet columns) - throws IOException { + throws IOException { this(store, scan.getCacheBlocks(), scan, columns); - matcher = new ScanQueryMatcher(scan, store.getFamily().getName(), - columns, store.ttl, store.comparator.getRawComparator(), - store.minVersions, store.versionsToReturn(scan.getMaxVersions()), - false); + if (columns != null && scan.isRaw()) { + throw new DoNotRetryIOException( + "Cannot specify any column for a raw scan"); + } + matcher = new ScanQueryMatcher(scan, store.scanInfo, columns, + ScanType.USER_SCAN, HConstants.LATEST_TIMESTAMP); // Pass columns to try to filter out unnecessary StoreFiles. List scanners = getScanners(scan, columns); @@ -124,12 +127,12 @@ * @param scan the spec * @param scanners ancilliary scanners */ - StoreScanner(Store store, Scan scan, List scanners, - boolean retainDeletesInOutput) throws IOException { + StoreScanner(Store store, Scan scan, + List scanners, ScanType scanType, + long earliestPutTs) throws IOException { this(store, false, scan, null); - matcher = new ScanQueryMatcher(scan, store.getFamily().getName(), - null, store.ttl, store.comparator.getRawComparator(), store.minVersions, - store.versionsToReturn(scan.getMaxVersions()), retainDeletesInOutput); + matcher = new ScanQueryMatcher(scan, store.scanInfo, null, scanType, + earliestPutTs); // Seek all scanners to the initial key for(KeyValueScanner scanner : scanners) { @@ -141,20 +144,18 @@ } // Constructor for testing. - StoreScanner(final Scan scan, final byte [] colFamily, final long ttl, - final KeyValue.KVComparator comparator, - final NavigableSet columns, - final List scanners) - throws IOException { + StoreScanner(final Scan scan, Store.ScanInfo scanInfo, + StoreScanner.ScanType scanType, final NavigableSet columns, + final List scanners) throws IOException { this(null, scan.getCacheBlocks(), scan, columns); - this.matcher = new ScanQueryMatcher(scan, colFamily, columns, ttl, - comparator.getRawComparator(), 0, scan.getMaxVersions(), false); + this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType, + HConstants.LATEST_TIMESTAMP); // Seek all scanners to the initial key - for(KeyValueScanner scanner : scanners) { + for (KeyValueScanner scanner : scanners) { scanner.seek(matcher.getStartKey()); } - heap = new KeyValueHeap(scanners, comparator); + heap = new KeyValueHeap(scanners, scanInfo.getComparator()); } /* @@ -476,5 +477,13 @@ lazySeekEnabledGlobally = enable; } + /** + * Enum to distinguish general scan types. + */ + public static enum ScanType { + MAJOR_COMPACT, + MINOR_COMPACT, + USER_SCAN + } } Index: src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java (revision 1186348) +++ src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java (working copy) @@ -24,6 +24,7 @@ import java.util.NavigableSet; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode; import org.apache.hadoop.hbase.util.Bytes; @@ -97,16 +98,14 @@ } /** - * Checks against the parameters of the query and the columns which have - * already been processed by this query. - * @param bytes KeyValue buffer - * @param offset offset to the start of the qualifier - * @param length length of the qualifier - * @param timestamp timestamp of the key being checked - * @return MatchCode telling ScanQueryMatcher what action to take + * {@inheritDoc} */ + @Override public ScanQueryMatcher.MatchCode checkColumn(byte [] bytes, int offset, - int length, long timestamp) { + int length, long timestamp, byte type) { + // delete markers should never be passed to an + // *Explicit*ColumnTracker + assert !KeyValue.isDelete(type); do { // No more columns left, we are done with this query if(this.columns.size() == 0) { @@ -143,12 +142,12 @@ if (this.columns.size() == this.index) { // We have served all the requested columns. this.column = null; - return ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW; + return ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW; } else { - // We are done with current column; advance to next column - // of interest. + // We are done with current column; advance to next column + // of interest. this.column = this.columns.get(this.index); - return ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL; + return ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL; } } else { setTS(timestamp); Index: src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java (revision 1186348) +++ src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java (working copy) @@ -22,9 +22,8 @@ import java.io.IOException; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode; import org.apache.hadoop.hbase.util.Bytes; @@ -32,17 +31,17 @@ * Keeps track of the columns for a scan if they are not explicitly specified */ public class ScanWildcardColumnTracker implements ColumnTracker { - private static final Log LOG = - LogFactory.getLog(ScanWildcardColumnTracker.class); private byte [] columnBuffer = null; private int columnOffset = 0; private int columnLength = 0; private int currentCount = 0; private int maxVersions; private int minVersions; - /* Keeps track of the latest timestamp included for current column. + /* Keeps track of the latest timestamp and type included for current column. * Used to eliminate duplicates. */ private long latestTSOfCurrentColumn; + private byte latestTypeOfCurrentColumn; + private long oldestStamp; /** @@ -58,41 +57,38 @@ } /** - * Can only return INCLUDE or SKIP, since returning "NEXT" or - * "DONE" would imply we have finished with this row, when - * this class can't figure that out. - * - * @param bytes - * @param offset - * @param length - * @param timestamp - * @return The match code instance. + * {@inheritDoc} + * This receives puts *and* deletes. + * Deletes do not count as a version, but rather take the version + * of the previous put (so eventually all but the last can be reclaimed). */ @Override public MatchCode checkColumn(byte[] bytes, int offset, int length, - long timestamp) throws IOException { + long timestamp, byte type) throws IOException { + if (columnBuffer == null) { // first iteration. resetBuffer(bytes, offset, length); - return checkVersion(++currentCount, timestamp); + // do not count a delete marker as another version + return checkVersion(type, timestamp); } int cmp = Bytes.compareTo(bytes, offset, length, columnBuffer, columnOffset, columnLength); if (cmp == 0) { //If column matches, check if it is a duplicate timestamp - if (sameAsPreviousTS(timestamp)) { + if (sameAsPreviousTSAndType(timestamp, type)) { return ScanQueryMatcher.MatchCode.SKIP; } - return checkVersion(++currentCount, timestamp); + return checkVersion(type, timestamp); } - resetTS(); + resetTSAndType(); // new col > old col if (cmp > 0) { // switched columns, lets do something.x resetBuffer(bytes, offset, length); - return checkVersion(++currentCount, timestamp); + return checkVersion(type, timestamp); } // new col < oldcol @@ -112,13 +108,25 @@ currentCount = 0; } - private MatchCode checkVersion(int version, long timestamp) { - if (version > maxVersions) { + /** + * Check whether this version should be retained. + * There are 4 variables considered: + * If this version is past max versions -> skip it + * If this kv has expired or was deleted, check min versions + * to decide whther to skip it or not. + * + * Increase the version counter unless this is a delete + */ + private MatchCode checkVersion(byte type, long timestamp) { + if (!KeyValue.isDelete(type)) { + currentCount++; + } + if (currentCount > maxVersions) { return ScanQueryMatcher.MatchCode.SEEK_NEXT_COL; // skip to next col } // keep the KV if required by minversions or it is not expired, yet - if (version <= minVersions || !isExpired(timestamp)) { - setTS(timestamp); + if (currentCount <= minVersions || !isExpired(timestamp)) { + setTSAndType(timestamp, type); return ScanQueryMatcher.MatchCode.INCLUDE; } else { return MatchCode.SEEK_NEXT_COL; @@ -136,19 +144,21 @@ @Override public void reset() { columnBuffer = null; - resetTS(); + resetTSAndType(); } - private void resetTS() { + private void resetTSAndType() { latestTSOfCurrentColumn = HConstants.LATEST_TIMESTAMP; + latestTypeOfCurrentColumn = 0; } - private void setTS(long timestamp) { + private void setTSAndType(long timestamp, byte type) { latestTSOfCurrentColumn = timestamp; + latestTypeOfCurrentColumn = type; } - private boolean sameAsPreviousTS(long timestamp) { - return timestamp == latestTSOfCurrentColumn; + private boolean sameAsPreviousTSAndType(long timestamp, byte type) { + return timestamp == latestTSOfCurrentColumn && type == latestTypeOfCurrentColumn; } private boolean isExpired(long timestamp) { Index: src/main/java/org/apache/hadoop/hbase/regionserver/Store.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (revision 1186348) +++ src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (working copy) @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.KVComparator; import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.HeapSize; @@ -49,6 +50,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.regionserver.StoreScanner.ScanType; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.util.Bytes; @@ -95,9 +97,7 @@ final Configuration conf; final CacheConfig cacheConf; // ttl in milliseconds. - protected long ttl; - protected int minVersions; - protected int maxVersions; + private long ttl; long majorCompactionTime; private final int minFilesToCompact; private final int maxFilesToCompact; @@ -119,6 +119,8 @@ private CompactionProgress progress; private final int compactionKVMax; + // not private for testing + /* package */ScanInfo scanInfo; /* * List of store files inside this store. This is an immutable list that * is atomically replaced when its contents change. @@ -183,8 +185,9 @@ // second -> ms adjust for user data this.ttl *= 1000; } - this.minVersions = family.getMinVersions(); - this.maxVersions = family.getMaxVersions(); + scanInfo = new ScanInfo(family.getName(), family.getMinVersions(), + family.getMaxVersions(), ttl, family.getKeepDeletedCells(), + this.comparator); this.memstore = new MemStore(conf, this.comparator); this.storeNameStr = Bytes.toString(this.family.getName()); @@ -477,13 +480,13 @@ return null; } Scan scan = new Scan(); - scan.setMaxVersions(maxVersions); + scan.setMaxVersions(scanInfo.getMaxVersions()); // Use a store scanner to find which rows to flush. // Note that we need to retain deletes, hence - // pass true as the StoreScanner's retainDeletesInOutput argument. - InternalScanner scanner = new StoreScanner(this, scan, - Collections.singletonList(new CollectionBackedScanner(set, - this.comparator)), true); + // treat this as a minor compaction. + InternalScanner scanner = new StoreScanner(this, scan, Collections + .singletonList(new CollectionBackedScanner(set, this.comparator)), + ScanType.MINOR_COMPACT, HConstants.OLDEST_TIMESTAMP); try { // TODO: We can fail in the below block before we complete adding this // flush to list of store files. Add cleanup of anything put on filesystem @@ -1108,6 +1111,7 @@ throws IOException { // calculate maximum key count after compaction (for blooms) int maxKeyCount = 0; + long earliestPutTs = HConstants.LATEST_TIMESTAMP; for (StoreFile file : filesToCompact) { StoreFile.Reader r = file.getReader(); if (r != null) { @@ -1123,6 +1127,19 @@ ", size=" + StringUtils.humanReadableInt(r.length()) ); } } + // For major compactions calculate the earliest put timestamp + // of all involved storefiles. This is used to remove + // family delete marker during the compaction. + if (majorCompaction) { + byte[] tmp = r.loadFileInfo().get(StoreFile.EARLIEST_PUT_TS); + if (tmp == null) { + // there's a file with no information, must be an old one + // assume we have very old puts + earliestPutTs = HConstants.OLDEST_TIMESTAMP; + } else { + earliestPutTs = Math.min(earliestPutTs, Bytes.toLong(tmp)); + } + } } // keep track of compaction progress @@ -1141,7 +1158,9 @@ Scan scan = new Scan(); scan.setMaxVersions(family.getMaxVersions()); /* include deletes, unless we are doing a major compaction */ - scanner = new StoreScanner(this, scan, scanners, !majorCompaction); + scanner = new StoreScanner(this, scan, scanners, + majorCompaction ? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT, + earliestPutTs); if (region.getCoprocessorHost() != null) { InternalScanner cpScanner = region.getCoprocessorHost().preCompact( this, scanner); @@ -1374,7 +1393,7 @@ // at all (expired or not) has at least one version that will not expire. // Note that this method used to take a KeyValue as arguments. KeyValue // can be back-dated, a row key cannot. - long ttlToUse = this.minVersions > 0 ? Long.MAX_VALUE : this.ttl; + long ttlToUse = scanInfo.getMinVersions() > 0 ? Long.MAX_VALUE : this.ttl; KeyValue kv = new KeyValue(row, HConstants.LATEST_TIMESTAMP); @@ -1842,15 +1861,16 @@ return this.cacheConf; } - public static final long FIXED_OVERHEAD = ClassSize.align( - ClassSize.OBJECT + (17 * ClassSize.REFERENCE) + - (7 * Bytes.SIZEOF_LONG) + (1 * Bytes.SIZEOF_DOUBLE) + - (7 * Bytes.SIZEOF_INT) + (1 * Bytes.SIZEOF_BOOLEAN)); + public static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT + + (18 * ClassSize.REFERENCE) + (7 * Bytes.SIZEOF_LONG) + + (1 * Bytes.SIZEOF_DOUBLE) + (5 * Bytes.SIZEOF_INT) + + Bytes.SIZEOF_BOOLEAN); - public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + - ClassSize.OBJECT + ClassSize.REENTRANT_LOCK + - ClassSize.CONCURRENT_SKIPLISTMAP + - ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + ClassSize.OBJECT); + public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK + + ClassSize.CONCURRENT_SKIPLISTMAP + + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + ClassSize.OBJECT + + ScanInfo.FIXED_OVERHEAD); @Override public long heapSize() { @@ -1861,4 +1881,62 @@ return comparator; } + /** + * Immutable information for scans over a store. + */ + public static class ScanInfo { + private byte[] family; + private int minVersions; + private int maxVersions; + private long ttl; + private boolean keepDeletedCells; + private KVComparator comparator; + + public static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT + + (2 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_INT) + + Bytes.SIZEOF_LONG + Bytes.SIZEOF_BOOLEAN); + + /** + * @param family Name of this store's column family + * @param minVersions Store's MIN_VERSIONS setting + * @param maxVersions Store's VERSIONS setting + * @param ttl Store's TTL (in ms) + * @param keepDeletedCells Store's keepDeletedCells setting + * @param comparator The store's comparator + */ + public ScanInfo(byte[] family, int minVersions, int maxVersions, long ttl, + boolean keepDeletedCells, KVComparator comparator) { + + this.family = family; + this.minVersions = minVersions; + this.maxVersions = maxVersions; + this.ttl = ttl; + this.keepDeletedCells = keepDeletedCells; + this.comparator = comparator; + } + + public byte[] getFamily() { + return family; + } + + public int getMinVersions() { + return minVersions; + } + + public int getMaxVersions() { + return maxVersions; + } + + public long getTtl() { + return ttl; + } + + public boolean getKeepDeletedCells() { + return keepDeletedCells; + } + + public KVComparator getComparator() { + return comparator; + } + } } Index: src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (revision 1186348) +++ src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (working copy) @@ -40,6 +40,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.KVComparator; @@ -116,6 +117,9 @@ /** Key for Timerange information in metadata*/ public static final byte[] TIMERANGE_KEY = Bytes.toBytes("TIMERANGE"); + /** Key for timestamp of earliest-put in metadata*/ + public static final byte[] EARLIEST_PUT_TS = Bytes.toBytes("EARLIEST_PUT_TS"); + // Make default block size for StoreFiles 8k while testing. TODO: FIX! // Need to make it 8k for testing. public static final int DEFAULT_BLOCKSIZE_SMALL = 8 * 1024; @@ -737,6 +741,7 @@ private int lastBloomKeyOffset, lastBloomKeyLen; private KVComparator kvComparator; private KeyValue lastKv = null; + private long earliestPutTs = HConstants.LATEST_TIMESTAMP; TimeRangeTracker timeRangeTracker = new TimeRangeTracker(); /* isTimeRangeTrackerSet keeps track if the timeRange has already been set @@ -796,14 +801,15 @@ writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId)); writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction)); - appendTimeRangeMetadata(); + appendTrackedTimestampsToMetadata(); } /** - * Add TimestampRange to Metadata + * Add TimestampRange and earliest put timestamp to Metadata */ - public void appendTimeRangeMetadata() throws IOException { + public void appendTrackedTimestampsToMetadata() throws IOException { appendFileInfo(TIMERANGE_KEY,WritableUtils.toByteArray(timeRangeTracker)); + appendFileInfo(EARLIEST_PUT_TS, Bytes.toBytes(earliestPutTs)); } /** @@ -816,29 +822,22 @@ } /** + * Record the earlest Put timestamp. + * * If the timeRangeTracker is not set, * update TimeRangeTracker to include the timestamp of this key * @param kv * @throws IOException */ - public void includeInTimeRangeTracker(final KeyValue kv) { + public void trackTimestamps(final KeyValue kv) { + if (KeyValue.Type.Put.getCode() == kv.getType()) { + earliestPutTs = Math.min(earliestPutTs, kv.getTimestamp()); + } if (!isTimeRangeTrackerSet) { timeRangeTracker.includeTimestamp(kv); } } - /** - * If the timeRangeTracker is not set, - * update TimeRangeTracker to include the timestamp of this key - * @param key - * @throws IOException - */ - public void includeInTimeRangeTracker(final byte [] key) { - if (!isTimeRangeTrackerSet) { - timeRangeTracker.includeTimestamp(key); - } - } - public void append(final KeyValue kv) throws IOException { if (this.bloomFilterWriter != null) { // only add to the bloom filter on a new, unique key @@ -908,7 +907,7 @@ } } writer.append(kv); - includeInTimeRangeTracker(kv); + trackTimestamps(kv); } public Path getPath() { Index: src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java (revision 1186348) +++ src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java (working copy) @@ -47,12 +47,14 @@ * @param offset * @param length * @param ttl The timeToLive to enforce. + * @param type The type of the KeyValue * @return The match code instance. * @throws IOException in case there is an internal consistency problem * caused by a data corruption. */ - public ScanQueryMatcher.MatchCode checkColumn(byte [] bytes, int offset, - int length, long ttl) throws IOException; + public ScanQueryMatcher.MatchCode checkColumn(byte[] bytes, int offset, + int length, long ttl, byte type) + throws IOException; /** * Updates internal variables in between files Index: src/main/java/org/apache/hadoop/hbase/KeyValue.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/KeyValue.java (revision 1186348) +++ src/main/java/org/apache/hadoop/hbase/KeyValue.java (working copy) @@ -207,6 +207,14 @@ // the row cached private volatile byte [] rowCache = null; + /** + * @return True if a delete type, a {@link KeyValue.Type#Delete} or + * a {KeyValue.Type#DeleteFamily} or a {@link KeyValue.Type#DeleteColumn} + * KeyValue type. + */ + public static boolean isDelete(byte t) { + return Type.Delete.getCode() <= t && t <= Type.DeleteFamily.getCode(); + } /** Here be dragons **/ @@ -1038,8 +1046,7 @@ * KeyValue type. */ public boolean isDelete() { - int t = getType(); - return Type.Delete.getCode() <= t && t <= Type.DeleteFamily.getCode(); + return KeyValue.isDelete(getType()); } /** Index: src/main/ruby/hbase/admin.rb =================================================================== --- src/main/ruby/hbase/admin.rb (revision 1186348) +++ src/main/ruby/hbase/admin.rb (working copy) @@ -463,6 +463,7 @@ family.setBlocksize(JInteger.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::BLOCKSIZE])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::BLOCKSIZE) family.setMaxVersions(JInteger.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::VERSIONS])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::VERSIONS) family.setMinVersions(JInteger.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::MIN_VERSIONS])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::MIN_VERSIONS) + family.setKeepDeletedRows(JBoolean.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::KEEP_DELETED_CELLS])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::KEEP_DELETED_CELLS) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::BLOOMFILTER) bloomtype = arg[org.apache.hadoop.hbase.HColumnDescriptor::BLOOMFILTER].upcase unless org.apache.hadoop.hbase.regionserver.StoreFile::BloomType.constants.include?(bloomtype)