Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestMinorCompactingStoreScanner.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestMinorCompactingStoreScanner.java (revision 1032175) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestMinorCompactingStoreScanner.java (working copy) @@ -1,90 +0,0 @@ -/* - * Copyright 2009 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.regionserver; - -import junit.framework.TestCase; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValueTestUtil; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import static org.apache.hadoop.hbase.regionserver.KeyValueScanFixture.scanFixture; - -public class TestMinorCompactingStoreScanner extends TestCase { - - public void testDeleteFamiliy() throws IOException { - KeyValue[] kvs = new KeyValue[] { - KeyValueTestUtil.create("R1", "cf", "a", 100, KeyValue.Type.DeleteFamily, "dont-care"), - KeyValueTestUtil.create("R1", "cf", "b", 11, KeyValue.Type.Put, "dont-care"), - KeyValueTestUtil.create("R1", "cf", "c", 11, KeyValue.Type.Put, "dont-care"), - KeyValueTestUtil.create("R1", "cf", "d", 11, KeyValue.Type.Put, "dont-care"), - KeyValueTestUtil.create("R1", "cf", "e", 11, KeyValue.Type.Put, "dont-care"), - KeyValueTestUtil.create("R1", "cf", "e", 11, KeyValue.Type.DeleteColumn, "dont-care"), - KeyValueTestUtil.create("R1", "cf", "f", 11, KeyValue.Type.Put, "dont-care"), - KeyValueTestUtil.create("R1", "cf", "g", 11, KeyValue.Type.Put, "dont-care"), - KeyValueTestUtil.create("R1", "cf", "g", 11, KeyValue.Type.Delete, "dont-care"), - KeyValueTestUtil.create("R1", "cf", "h", 11, KeyValue.Type.Put, "dont-care"), - KeyValueTestUtil.create("R1", "cf", "i", 11, KeyValue.Type.Put, "dont-care"), - KeyValueTestUtil.create("R2", "cf", "a", 11, KeyValue.Type.Put, "dont-care"), - }; - List scanners = scanFixture(kvs); - - InternalScanner scan = - new MinorCompactingStoreScanner("cf", KeyValue.COMPARATOR, scanners); - List results = new ArrayList(); - assertTrue(scan.next(results)); - assertEquals(11, results.size()); - assertEquals(kvs[0], results.get(0)); - assertEquals(kvs[1], results.get(1)); - assertEquals(kvs[2], results.get(2)); - assertEquals(kvs[3], results.get(3)); - assertEquals(kvs[5], results.get(4)); - assertEquals(kvs[4], results.get(5)); - assertEquals(kvs[6], results.get(6)); - assertEquals(kvs[8], results.get(7)); - assertEquals(kvs[7], results.get(8)); - assertEquals(kvs[9], results.get(9)); - assertEquals(kvs[10], results.get(10)); - - results.clear(); - assertFalse(scan.next(results)); - assertEquals(1, results.size()); - assertEquals(kvs[kvs.length-1], results.get(0)); - } - - public void testDeleteVersion() throws IOException { - KeyValue[] kvs = new KeyValue[] { - KeyValueTestUtil.create("R1", "cf", "a", 15, KeyValue.Type.Put, "dont-care"), - KeyValueTestUtil.create("R1", "cf", "a", 10, KeyValue.Type.Delete, "dont-care"), - KeyValueTestUtil.create("R1", "cf", "a", 10, KeyValue.Type.Put, "dont-care") - }; - List scanners = scanFixture(kvs); - InternalScanner scan = - new MinorCompactingStoreScanner("cf", KeyValue.COMPARATOR, scanners); - List results = new ArrayList(); - assertFalse(scan.next(results)); - assertEquals(3, results.size()); - assertEquals(kvs[0], results.get(0)); - assertEquals(kvs[1], results.get(1)); - assertEquals(kvs[2], results.get(2)); - } -} Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java (revision 1032175) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java (working copy) @@ -58,18 +58,31 @@ private static final byte [] COLUMN_FAMILY = fam1; private final byte [] STARTROW = Bytes.toBytes(START_KEY); private static final byte [] COLUMN_FAMILY_TEXT = COLUMN_FAMILY; - private static final int COMPACTION_THRESHOLD = MAXVERSIONS; + private int compactionThreshold; + private byte[] firstRowBytes, secondRowBytes, thirdRowBytes; + final private byte[] col1, col2; private MiniDFSCluster cluster; /** constructor */ - public TestCompaction() { + public TestCompaction() throws Exception { super(); // Set cache flush size to 1MB conf.setInt("hbase.hregion.memstore.flush.size", 1024*1024); conf.setInt("hbase.hregion.memstore.block.multiplier", 100); this.cluster = null; + compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3); + + firstRowBytes = START_KEY.getBytes(HConstants.UTF8_ENCODING); + secondRowBytes = START_KEY.getBytes(HConstants.UTF8_ENCODING); + // Increment the least significant character so we get to next row. + secondRowBytes[START_KEY_BYTES.length - 1]++; + thirdRowBytes = START_KEY.getBytes(HConstants.UTF8_ENCODING); + thirdRowBytes[START_KEY_BYTES.length - 1]++; + thirdRowBytes[START_KEY_BYTES.length - 1]++; + col1 = "column1".getBytes(HConstants.UTF8_ENCODING); + col2 = "column2".getBytes(HConstants.UTF8_ENCODING); } @Override @@ -102,7 +115,7 @@ */ public void testMajorCompactingToNoOutput() throws IOException { createStoreFile(r); - for (int i = 0; i < COMPACTION_THRESHOLD; i++) { + for (int i = 0; i < compactionThreshold; i++) { createStoreFile(r); } // Now delete everything. @@ -133,43 +146,35 @@ * Assert deletes get cleaned up. * @throws Exception */ - public void testCompaction() throws Exception { + public void testMajorCompaction() throws Exception { createStoreFile(r); - for (int i = 0; i < COMPACTION_THRESHOLD; i++) { + for (int i = 0; i < compactionThreshold; i++) { createStoreFile(r); } - // Add more content. Now there are about 5 versions of each column. + // Add more content. + addContent(new HRegionIncommon(r), Bytes.toString(COLUMN_FAMILY)); + + // Now there are about 5 versions of each column. // Default is that there only 3 (MAXVERSIONS) versions allowed per column. + // // Assert == 3 when we ask for versions. - addContent(new HRegionIncommon(r), Bytes.toString(COLUMN_FAMILY)); - - - // FIX!! -// Cell[] cellValues = -// Cell.createSingleCellArray(r.get(STARTROW, COLUMN_FAMILY_TEXT, -1, 100 /*Too many*/)); Result result = r.get(new Get(STARTROW).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100), null); + assertEquals(compactionThreshold, result.size()); - // Assert that I can get 3 versions since it is the max I should get - assertEquals(COMPACTION_THRESHOLD, result.size()); -// assertEquals(cellValues.length, 3); r.flushcache(); - r.compactStores(); - // Always 3 versions if that is what max versions is. + r.compactStores(true); + + // look at the second row + // Increment the least significant character so we get to next row. byte [] secondRowBytes = START_KEY.getBytes(HConstants.UTF8_ENCODING); - // Increment the least significant character so we get to next row. secondRowBytes[START_KEY_BYTES.length - 1]++; - // FIX + + // Always 3 versions if that is what max versions is. result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100), null); + assertEquals(compactionThreshold, result.size()); - // Assert that I can get 3 versions since it is the max I should get - assertEquals(3, result.size()); -// -// cellValues = Cell.createSingleCellArray(r.get(secondRowBytes, COLUMN_FAMILY_TEXT, -1, 100/*Too many*/)); -// LOG.info("Count of " + Bytes.toString(secondRowBytes) + ": " + -// cellValues.length); -// assertTrue(cellValues.length == 3); - - // Now add deletes to memstore and then flush it. That will put us over + // Now add deletes to memstore and then flush it. + // That will put us over // the compaction threshold of 3 store files. Compacting these store files // should result in a compacted store file that has no references to the // deleted row. @@ -179,52 +184,33 @@ r.delete(delete, null, true); // Assert deleted. - result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100), null ); - assertTrue(result.isEmpty()); + assertTrue("Second row should have been deleted", result.isEmpty()); + r.flushcache(); - r.flushcache(); result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100), null ); - assertTrue(result.isEmpty()); + assertTrue("Second row should have been deleted", result.isEmpty()); // Add a bit of data and flush. Start adding at 'bbb'. createSmallerStoreFile(this.r); r.flushcache(); // Assert that the second row is still deleted. result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100), null ); - assertTrue(result.isEmpty()); + assertTrue("Second row should still be deleted", result.isEmpty()); // Force major compaction. r.compactStores(true); assertEquals(r.getStore(COLUMN_FAMILY_TEXT).getStorefiles().size(), 1); result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100), null ); - assertTrue(result.isEmpty()); + assertTrue("Second row should still be deleted", result.isEmpty()); // Make sure the store files do have some 'aaa' keys in them -- exactly 3. // Also, that compacted store files do not have any secondRowBytes because // they were deleted. - int count = 0; - boolean containsStartRow = false; - for (StoreFile f: this.r.stores.get(COLUMN_FAMILY_TEXT).getStorefiles()) { - HFileScanner scanner = f.getReader().getScanner(false, false); - scanner.seekTo(); - do { - byte [] row = scanner.getKeyValue().getRow(); - if (Bytes.equals(row, STARTROW)) { - containsStartRow = true; - count++; - } else { - // After major compaction, should be none of these rows in compacted - // file. - assertFalse(Bytes.equals(row, secondRowBytes)); - } - } while(scanner.next()); - } - assertTrue(containsStartRow); - assertTrue(count == 3); - + verifyCounts(3,0); + // Multiple versions allowed for an entry, so the delete isn't enough // Lower TTL and expire to ensure that all our entries have been wiped final int ttlInSeconds = 1; @@ -234,13 +220,145 @@ Thread.sleep(ttlInSeconds * 1000); r.compactStores(true); - count = count(); - assertTrue(count == 0); + int count = count(); + assertTrue("Should not see anything after TTL has expired", count == 0); } - + public void testMinorCompactionWithDeleteRow() throws Exception { + Delete deleteRow = new Delete(secondRowBytes); + testMinorCompactionWithDelete(deleteRow); + } + public void testMinorCompactionWithDeleteColumn1() throws Exception { + Delete dc = new Delete(secondRowBytes); + /* delete all timestamps in the column */ + dc.deleteColumns(fam2, col2); + testMinorCompactionWithDelete(dc); + } + public void testMinorCompactionWithDeleteColumn2() throws Exception { + Delete dc = new Delete(secondRowBytes); + dc.deleteColumn(fam2, col2); + /* compactionThreshold is 3. The table has 4 versions: 0, 1, 2, and 3. + * we only delete the latest version. One might expect to see only + * versions 1 and 2. HBase differs, and gives us 0, 1 and 2. + * This is okay as well. Since there was no compaction done before the + * delete, version 0 seems to stay on. + */ + //testMinorCompactionWithDelete(dc, 2); + testMinorCompactionWithDelete(dc, 3); + } + public void testMinorCompactionWithDeleteColumnFamily() throws Exception { + Delete deleteCF = new Delete(secondRowBytes); + deleteCF.deleteFamily(fam2); + testMinorCompactionWithDelete(deleteCF); + } + public void testMinorCompactionWithDeleteVersion1() throws Exception { + Delete deleteVersion = new Delete(secondRowBytes); + deleteVersion.deleteColumns(fam2, col2, 2); + /* compactionThreshold is 3. The table has 4 versions: 0, 1, 2, and 3. + * We delete versions 0 ... 2. So, we still have one remaining. + */ + testMinorCompactionWithDelete(deleteVersion, 1); + } + public void testMinorCompactionWithDeleteVersion2() throws Exception { + Delete deleteVersion = new Delete(secondRowBytes); + deleteVersion.deleteColumn(fam2, col2, 1); + /* + * the table has 4 versions: 0, 1, 2, and 3. + * 0 does not count. + * We delete 1. + * Should have 2 remaining. + */ + testMinorCompactionWithDelete(deleteVersion, 2); + } + + /* + * A helper function to test the minor compaction algorithm. We check that + * the delete markers are left behind. Takes delete as an argument, which + * can be any delete (row, column, columnfamliy etc), that essentially + * deletes row2 and column2. row1 and column1 should be undeleted + */ + private void testMinorCompactionWithDelete(Delete delete) throws Exception { + testMinorCompactionWithDelete(delete, 0); + } + private void testMinorCompactionWithDelete(Delete delete, int expectedResultsAfterDelete) throws Exception { + HRegionIncommon loader = new HRegionIncommon(r); + for (int i = 0; i < compactionThreshold + 1; i++) { + addContent(loader, Bytes.toString(fam1), Bytes.toString(col1), firstRowBytes, thirdRowBytes, i); + addContent(loader, Bytes.toString(fam1), Bytes.toString(col2), firstRowBytes, thirdRowBytes, i); + addContent(loader, Bytes.toString(fam2), Bytes.toString(col1), firstRowBytes, thirdRowBytes, i); + addContent(loader, Bytes.toString(fam2), Bytes.toString(col2), firstRowBytes, thirdRowBytes, i); + r.flushcache(); + } + + Result result = r.get(new Get(firstRowBytes).addColumn(fam1, col1).setMaxVersions(100), null); + assertEquals(compactionThreshold, result.size()); + result = r.get(new Get(secondRowBytes).addColumn(fam2, col2).setMaxVersions(100), null); + assertEquals(compactionThreshold, result.size()); + + // Now add deletes to memstore and then flush it. That will put us over + // the compaction threshold of 3 store files. Compacting these store files + // should result in a compacted store file that has no references to the + // deleted row. + r.delete(delete, null, true); + + // Make sure that we have only deleted family2 from secondRowBytes + result = r.get(new Get(secondRowBytes).addColumn(fam2, col2).setMaxVersions(100), null); + assertEquals(expectedResultsAfterDelete, result.size()); + // but we still have firstrow + result = r.get(new Get(firstRowBytes).addColumn(fam1, col1).setMaxVersions(100), null); + assertEquals(compactionThreshold, result.size()); + + r.flushcache(); + // should not change anything. + // Let us check again + + // Make sure that we have only deleted family2 from secondRowBytes + result = r.get(new Get(secondRowBytes).addColumn(fam2, col2).setMaxVersions(100), null); + assertEquals(expectedResultsAfterDelete, result.size()); + // but we still have firstrow + result = r.get(new Get(firstRowBytes).addColumn(fam1, col1).setMaxVersions(100), null); + assertEquals(compactionThreshold, result.size()); + + // do a compaction + Store store2 = this.r.stores.get(fam2); + int numFiles1 = store2.getStorefiles().size(); + assertTrue("Was expecting to see 4 store files", numFiles1 > compactionThreshold); // > 3 + store2.compactRecent(compactionThreshold); // = 3 + int numFiles2 = store2.getStorefiles().size(); + // Check that we did compact + assertTrue("Number of store files should go down", numFiles1 > numFiles2); + // Check that it was a minor compaction. + assertTrue("Was not supposed to be a major compaction", numFiles2 > 1); + + // Make sure that we have only deleted family2 from secondRowBytes + result = r.get(new Get(secondRowBytes).addColumn(fam2, col2).setMaxVersions(100), null); + assertEquals(expectedResultsAfterDelete, result.size()); + // but we still have firstrow + result = r.get(new Get(firstRowBytes).addColumn(fam1, col1).setMaxVersions(100), null); + assertEquals(compactionThreshold, result.size()); + } + + private void verifyCounts(int countRow1, int countRow2) throws Exception { + int count1 = 0; + int count2 = 0; + for (StoreFile f: this.r.stores.get(COLUMN_FAMILY_TEXT).getStorefiles()) { + HFileScanner scanner = f.getReader().getScanner(false, false); + scanner.seekTo(); + do { + byte [] row = scanner.getKeyValue().getRow(); + if (Bytes.equals(row, STARTROW)) { + count1++; + } else if(Bytes.equals(row, secondRowBytes)) { + count2++; + } + } while(scanner.next()); + } + assertEquals(countRow1,count1); + assertEquals(countRow2,count2); + } + /** - * Verify that you can stop a long-running compaction + * Verify that you can stop a long-running compaction * (used during RS shutdown) * @throws Exception */ @@ -253,9 +371,9 @@ try { // Create a couple store files w/ 15KB (over 10KB interval) - int jmax = (int) Math.ceil(15.0/COMPACTION_THRESHOLD); + int jmax = (int) Math.ceil(15.0/compactionThreshold); byte [] pad = new byte[1000]; // 1 KB chunk - for (int i = 0; i < COMPACTION_THRESHOLD; i++) { + for (int i = 0; i < compactionThreshold; i++) { HRegionIncommon loader = new HRegionIncommon(r); Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i))); for (int j = 0; j < jmax; j++) { @@ -265,7 +383,7 @@ loader.put(p); loader.flushcache(); } - + HRegion spyR = spy(r); doAnswer(new Answer() { public Object answer(InvocationOnMock invocation) throws Throwable { @@ -276,10 +394,10 @@ // force a minor compaction, but not before requesting a stop spyR.compactStores(); - - // ensure that the compaction stopped, all old files are intact, + + // ensure that the compaction stopped, all old files are intact, Store s = r.stores.get(COLUMN_FAMILY); - assertEquals(COMPACTION_THRESHOLD, s.getStorefilesCount()); + assertEquals(compactionThreshold, s.getStorefilesCount()); assertTrue(s.getStorefilesSize() > 15*1000); // and no new store files persisted past compactStores() FileStatus[] ls = cluster.getFileSystem().listStatus(r.getTmpDir()); @@ -291,7 +409,7 @@ Store.closeCheckInterval = origWI; // Delete all Store information once done using - for (int i = 0; i < COMPACTION_THRESHOLD; i++) { + for (int i = 0; i < compactionThreshold; i++) { Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i))); byte [][] famAndQf = {COLUMN_FAMILY, null}; delete.deleteFamily(famAndQf[0]); @@ -306,12 +424,12 @@ store.ttl = ttlInSeconds * 1000; } Thread.sleep(ttlInSeconds * 1000); - + r.compactStores(true); assertEquals(0, count()); } } - + private int count() throws IOException { int count = 0; for (StoreFile f: this.r.stores. Index: src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (revision 1032175) +++ src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (working copy) @@ -45,6 +45,7 @@ /** Keeps track of deletes */ protected DeleteTracker deletes; + protected boolean retainDeletesInOutput; /** Keeps track of columns and versions */ protected ColumnTracker columns; @@ -71,7 +72,8 @@ */ public ScanQueryMatcher(Scan scan, byte [] family, NavigableSet columns, long ttl, - KeyValue.KeyComparator rowComparator, int maxVersions) { + KeyValue.KeyComparator rowComparator, int maxVersions, + boolean retainDeletesInOutput) { this.tr = scan.getTimeRange(); this.oldestStamp = System.currentTimeMillis() - ttl; this.rowComparator = rowComparator; @@ -79,6 +81,7 @@ this.stopRow = scan.getStopRow(); this.startKey = KeyValue.createFirstOnRow(scan.getStartRow()); this.filter = scan.getFilter(); + this.retainDeletesInOutput = retainDeletesInOutput; // Single branch to deal with two types of reads (columns vs all in family) if (columns == null || columns.size() == 0) { @@ -90,6 +93,13 @@ this.columns = new ExplicitColumnTracker(columns,maxVersions); } } + public ScanQueryMatcher(Scan scan, byte [] family, + NavigableSet columns, long ttl, + KeyValue.KeyComparator rowComparator, int maxVersions) { + /* By default we will not include deletes */ + /* deletes are included explicitly (for minor compaction) */ + this(scan, family, columns, ttl, rowComparator, maxVersions, false); + } /** * Determines if the caller should do one of several things: @@ -159,7 +169,12 @@ this.deletes.add(bytes, offset, qualLength, timestamp, type); // Can't early out now, because DelFam come before any other keys } - return MatchCode.SKIP; + if (retainDeletesInOutput) { + return MatchCode.INCLUDE; + } + else { + return MatchCode.SKIP; + } } if (!this.deletes.isEmpty() && @@ -356,4 +371,4 @@ */ SEEK_NEXT_USING_HINT, } -} \ No newline at end of file +} Index: src/main/java/org/apache/hadoop/hbase/regionserver/MinorCompactingStoreScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/MinorCompactingStoreScanner.java (revision 1032175) +++ src/main/java/org/apache/hadoop/hbase/regionserver/MinorCompactingStoreScanner.java (working copy) @@ -1,138 +0,0 @@ -/** - * Copyright 2010 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.regionserver; - -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.KeyValue; - -import java.io.IOException; -import java.util.List; - -/** - * A scanner that does a minor compaction at the same time. Doesn't need to - * implement ChangedReadersObserver, since it doesn't scan memstore, only store files - * and optionally the memstore-snapshot. - */ -public class MinorCompactingStoreScanner implements KeyValueScanner, InternalScanner { - private KeyValueHeap heap; - private KeyValue.KVComparator comparator; - - MinorCompactingStoreScanner(Store store, List scanners) - throws IOException { - comparator = store.comparator; - KeyValue firstKv = KeyValue.createFirstOnRow(HConstants.EMPTY_START_ROW); - for (KeyValueScanner scanner : scanners ) { - scanner.seek(firstKv); - } - heap = new KeyValueHeap(scanners, store.comparator); - } - - MinorCompactingStoreScanner(String cfName, KeyValue.KVComparator comparator, - List scanners) - throws IOException { - this.comparator = comparator; - - KeyValue firstKv = KeyValue.createFirstOnRow(HConstants.EMPTY_START_ROW); - for (KeyValueScanner scanner : scanners ) { - scanner.seek(firstKv); - } - - heap = new KeyValueHeap(scanners, comparator); - } - - public KeyValue peek() { - return heap.peek(); - } - - public KeyValue next() throws IOException { - return heap.next(); - } - - @Override - public boolean seek(KeyValue key) { - // cant seek. - throw new UnsupportedOperationException("Can't seek a MinorCompactingStoreScanner"); - } - - public boolean reseek(KeyValue key) { - return seek(key); - } - - /** - * High performance merge scan. - * @param writer - * @return True if more. - * @throws IOException - */ - public boolean next(StoreFile.Writer writer) throws IOException { - KeyValue row = heap.peek(); - if (row == null) { - close(); - return false; - } - KeyValue kv; - while ((kv = heap.peek()) != null) { - // check to see if this is a different row - if (comparator.compareRows(row, kv) != 0) { - // reached next row - return true; - } - writer.append(heap.next()); - } - close(); - return false; - } - - @Override - public boolean next(List results) throws IOException { - KeyValue row = heap.peek(); - if (row == null) { - close(); - return false; - } - KeyValue kv; - while ((kv = heap.peek()) != null) { - // check to see if this is a different row - if (comparator.compareRows(row, kv) != 0) { - // reached next row - return true; - } - results.add(heap.next()); - } - close(); - return false; - } - - @Override - public boolean next(List results, int limit) throws IOException { - // should not use limits with minor compacting store scanner - return next(results); - } - - public void close() { - heap.close(); - } - - @Override - public long getSequenceID() { - return 0; - } -} Index: src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (revision 1032175) +++ src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (working copy) @@ -58,12 +58,14 @@ * @param columns which columns we are scanning * @throws IOException */ - StoreScanner(Store store, Scan scan, final NavigableSet columns) throws IOException { + StoreScanner(Store store, Scan scan, final NavigableSet columns) + throws IOException { this.store = store; this.cacheBlocks = scan.getCacheBlocks(); matcher = new ScanQueryMatcher(scan, store.getFamily().getName(), columns, store.ttl, store.comparator.getRawComparator(), - store.versionsToReturn(scan.getMaxVersions())); + store.versionsToReturn(scan.getMaxVersions()), + false); this.isGet = scan.isGetScan(); // pass columns = try to filter out unnecessary ScanFiles @@ -89,14 +91,15 @@ * @param scan the spec * @param scanners ancilliary scanners */ - StoreScanner(Store store, Scan scan, List scanners) - throws IOException { + StoreScanner(Store store, Scan scan, List scanners, + boolean retainDeletesInOutput) + throws IOException { this.store = store; this.cacheBlocks = false; this.isGet = false; matcher = new ScanQueryMatcher(scan, store.getFamily().getName(), null, store.ttl, store.comparator.getRawComparator(), - store.versionsToReturn(scan.getMaxVersions())); + store.versionsToReturn(scan.getMaxVersions()), retainDeletesInOutput); // Seek all scanners to the initial key for(KeyValueScanner scanner : scanners) { @@ -117,7 +120,7 @@ this.isGet = false; this.cacheBlocks = scan.getCacheBlocks(); this.matcher = new ScanQueryMatcher(scan, colFamily, columns, ttl, - comparator.getRawComparator(), scan.getMaxVersions()); + comparator.getRawComparator(), scan.getMaxVersions(), false); // Seek all scanners to the initial key for(KeyValueScanner scanner : scanners) { Index: src/main/java/org/apache/hadoop/hbase/regionserver/Store.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (revision 1032175) +++ src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (working copy) @@ -710,6 +710,28 @@ } /* + * Compact the most recent N files. Essentially a hook for testing. + */ + protected void compactRecent(int N) throws IOException { + synchronized(compactLock) { + List filesToCompact = this.storefiles; + int count = filesToCompact.size(); + if (N > count) { + throw new RuntimeException("Not enough files"); + } + + filesToCompact = new ArrayList(filesToCompact.subList(count-N, count)); + long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact); + boolean majorcompaction = (N == count); + + // Ready to go. Have list of files to compact. + StoreFile.Writer writer = compact(filesToCompact, majorcompaction, maxId); + // Move the compaction into place. + StoreFile sf = completeCompaction(filesToCompact, writer); + } + } + + /* * @param files * @return True if any of the files in files are References. */ @@ -843,13 +865,12 @@ // where all source cells are expired or deleted. StoreFile.Writer writer = null; try { - // NOTE: the majority of the time for a compaction is spent in this section - if (majorCompaction) { InternalScanner scanner = null; try { Scan scan = new Scan(); scan.setMaxVersions(family.getMaxVersions()); - scanner = new StoreScanner(this, scan, scanners); + /* include deletes, unless we are doing a major compaction */ + scanner = new StoreScanner(this, scan, scanners, !majorCompaction); int bytesWritten = 0; // since scanner.next() can return 'false' but still be delivering data, // we have to use a do/while loop. @@ -888,39 +909,6 @@ scanner.close(); } } - } else { - MinorCompactingStoreScanner scanner = null; - try { - scanner = new MinorCompactingStoreScanner(this, scanners); - if (scanner.peek() != null) { - writer = createWriterInTmp(maxKeyCount); - int bytesWritten = 0; - while (scanner.peek() != null) { - KeyValue kv = scanner.next(); - writer.append(kv); - - // check periodically to see if a system stop is requested - if (Store.closeCheckInterval > 0) { - bytesWritten += kv.getLength(); - if (bytesWritten > Store.closeCheckInterval) { - bytesWritten = 0; - if (!this.region.areWritesEnabled()) { - writer.close(); - fs.delete(writer.getPath(), false); - throw new InterruptedIOException( - "Aborting compaction of store " + this + - " in region " + this.region + - " because user requested stop."); - } - } - } - } - } - } finally { - if (scanner != null) - scanner.close(); - } - } } finally { if (writer != null) { writer.appendMetadata(maxId, majorCompaction);