Index: src/main/java/org/apache/hadoop/hbase/regionserver/Store.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (revision 1161347) +++ src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (working copy) @@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.CollectionBackedScanner; import org.apache.hadoop.util.StringUtils; import com.google.common.base.Preconditions; @@ -90,6 +91,7 @@ // ttl in milliseconds. protected long ttl; protected int minVersions; + protected int maxVersions; long majorCompactionTime; private final int minFilesToCompact; private final int maxFilesToCompact; @@ -178,6 +180,7 @@ this.ttl *= 1000; } this.minVersions = family.getMinVersions(); + this.maxVersions = family.getMaxVersions(); this.memstore = new MemStore(conf, this.comparator); this.storeNameStr = Bytes.toString(this.family.getName()); @@ -481,34 +484,45 @@ if (set.size() == 0) { return null; } - long oldestTimestamp = System.currentTimeMillis() - ttl; - // TODO: We can fail in the below block before we complete adding this - // flush to list of store files. Add cleanup of anything put on filesystem - // if we fail. - synchronized (flushLock) { - status.setStatus("Flushing " + this + ": creating writer"); - // A. Write the map out to the disk - writer = createWriterInTmp(set.size()); - writer.setTimeRangeTracker(snapshotTimeRangeTracker); - try { - for (KeyValue kv: set) { - // If minVersion > 0 we will wait until the next compaction to - // collect expired KVs. (following the logic for maxVersions). - // TODO: As Jonathan Gray points this can be optimized - // (see HBASE-4241) - if (minVersions > 0 || !isExpired(kv, oldestTimestamp)) { - writer.append(kv); - flushed += this.memstore.heapSizeChange(kv, true); + Scan scan = new Scan(); + scan.setMaxVersions(maxVersions); + // Use a store scanner to find which rows to flush. + // Note that we need to retain deletes, hence + // pass true as the StoreScanner's retainDeletesInOutput argument. + InternalScanner scanner = new StoreScanner(this, scan, + Collections.singletonList(new CollectionBackedScanner(set, + this.comparator)), true); + try { + // TODO: We can fail in the below block before we complete adding this + // flush to list of store files. Add cleanup of anything put on filesystem + // if we fail. + synchronized (flushLock) { + status.setStatus("Flushing " + this + ": creating writer"); + // A. Write the map out to the disk + writer = createWriterInTmp(set.size()); + writer.setTimeRangeTracker(snapshotTimeRangeTracker); + try { + List kvs = new ArrayList(); + while (scanner.next(kvs)) { + if (!kvs.isEmpty()) { + for (KeyValue kv : kvs) { + writer.append(kv); + flushed += this.memstore.heapSizeChange(kv, true); + } + kvs.clear(); + } } + } finally { + // Write out the log sequence number that corresponds to this output + // hfile. The hfile is current up to and including logCacheFlushId. + status.setStatus("Flushing " + this + ": appending metadata"); + writer.appendMetadata(logCacheFlushId, false); + status.setStatus("Flushing " + this + ": closing flushed file"); + writer.close(); } - } finally { - // Write out the log sequence number that corresponds to this output - // hfile. The hfile is current up to and including logCacheFlushId. - status.setStatus("Flushing " + this + ": appending metadata"); - writer.appendMetadata(logCacheFlushId, false); - status.setStatus("Flushing " + this + ": closing flushed file"); - writer.close(); } + } finally { + scanner.close(); } // Write-out finished successfully, move into the right spot @@ -1734,7 +1748,7 @@ public static final long FIXED_OVERHEAD = ClassSize.align( ClassSize.OBJECT + (15 * ClassSize.REFERENCE) + (8 * Bytes.SIZEOF_LONG) + (1 * Bytes.SIZEOF_DOUBLE) + - (5 * Bytes.SIZEOF_INT) + (3 * Bytes.SIZEOF_BOOLEAN)); + (6 * Bytes.SIZEOF_INT) + (3 * Bytes.SIZEOF_BOOLEAN)); public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK + Index: src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java (revision 0) @@ -0,0 +1,129 @@ +/* + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.SortedSet; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.regionserver.KeyValueScanner; + +/** + * Utility scanner that wraps a sortable collection and serves + * as a KeyValueScanner. + */ +public class CollectionBackedScanner implements KeyValueScanner { + final private Iterable data; + final KeyValue.KVComparator comparator; + private Iterator iter; + private KeyValue current; + + public CollectionBackedScanner(SortedSet set) { + this(set, KeyValue.COMPARATOR); + } + + public CollectionBackedScanner(SortedSet set, + KeyValue.KVComparator comparator) { + this.comparator = comparator; + data = set; + init(); + } + + public CollectionBackedScanner(List list) { + this(list, KeyValue.COMPARATOR); + } + + public CollectionBackedScanner(List list, + KeyValue.KVComparator comparator) { + Collections.sort(list, comparator); + this.comparator = comparator; + data = list; + init(); + } + + public CollectionBackedScanner(KeyValue.KVComparator comparator, + KeyValue... array) { + this.comparator = comparator; + + List tmp = new ArrayList(array.length); + for( int i = 0; i < array.length ; ++i) { + tmp.add(array[i]); + } + Collections.sort(tmp, comparator); + data = tmp; + init(); + } + + private void init() { + iter = data.iterator(); + if(iter.hasNext()){ + current = iter.next(); + } + } + + @Override + public KeyValue peek() { + return current; + } + + @Override + public KeyValue next() { + KeyValue oldCurrent = current; + if(iter.hasNext()){ + current = iter.next(); + } else { + current = null; + } + return oldCurrent; + } + + @Override + public boolean seek(KeyValue seekKv) { + // restart iterator + iter = data.iterator(); + return reseek(seekKv); + } + + @Override + public boolean reseek(KeyValue seekKv) { + while(iter.hasNext()){ + KeyValue next = iter.next(); + int ret = comparator.compare(next, seekKv); + if(ret >= 0){ + current = next; + return true; + } + } + return false; + } + + @Override + public long getSequenceID() { + return 0; + } + + @Override + public void close() { + // do nothing + } +} Index: src/test/java/org/apache/hadoop/hbase/regionserver/KeyValueScanFixture.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/KeyValueScanFixture.java (revision 1161347) +++ src/test/java/org/apache/hadoop/hbase/regionserver/KeyValueScanFixture.java (working copy) @@ -21,11 +21,10 @@ package org.apache.hadoop.hbase.regionserver; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; +import org.apache.hadoop.hbase.util.CollectionBackedScanner; import org.apache.hadoop.hbase.KeyValue; import java.util.ArrayList; -import java.util.Iterator; -import java.util.Collections; import java.util.List; /** @@ -34,21 +33,10 @@ * to the provided comparator, and then the whole thing pretends * to be a store file scanner. */ -public class KeyValueScanFixture implements KeyValueScanner { - ArrayList data; - Iterator iter = null; - KeyValue current = null; - KeyValue.KVComparator comparator; - +public class KeyValueScanFixture extends CollectionBackedScanner { public KeyValueScanFixture(KeyValue.KVComparator comparator, KeyValue... incData) { - this.comparator = comparator; - - data = new ArrayList(incData.length); - for( int i = 0; i < incData.length ; ++i) { - data.add(incData[i]); - } - Collections.sort(data, this.comparator); + super(comparator, incData); } public static List scanFixture(KeyValue[] ... kvArrays) { @@ -58,54 +46,4 @@ } return scanners; } - - - @Override - public KeyValue peek() { - return this.current; - } - - @Override - public KeyValue next() { - KeyValue res = current; - - if (iter.hasNext()) - current = iter.next(); - else - current = null; - return res; - } - - @Override - public boolean seek(KeyValue key) { - // start at beginning. - iter = data.iterator(); - int cmp; - KeyValue kv = null; - do { - if (!iter.hasNext()) { - current = null; - return false; - } - kv = iter.next(); - cmp = comparator.compare(key, kv); - } while (cmp > 0); - current = kv; - return true; - } - - @Override - public boolean reseek(KeyValue key) { - return seek(key); - } - - @Override - public void close() { - // noop. - } - - @Override - public long getSequenceID() { - return 0; - } } Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java (revision 1161347) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java (working copy) @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.HBaseTestCase; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CollectionBackedScanner; public class TestKeyValueHeap extends HBaseTestCase { @@ -208,33 +209,13 @@ } } - private static class Scanner implements KeyValueScanner { - private Iterator iter; - private KeyValue current; + private static class Scanner extends CollectionBackedScanner { private boolean closed = false; public Scanner(List list) { - Collections.sort(list, KeyValue.COMPARATOR); - iter = list.iterator(); - if(iter.hasNext()){ - current = iter.next(); - } + super(list); } - public KeyValue peek() { - return current; - } - - public KeyValue next() { - KeyValue oldCurrent = current; - if(iter.hasNext()){ - current = iter.next(); - } else { - current = null; - } - return oldCurrent; - } - public void close(){ closed = true; } @@ -242,28 +223,6 @@ public boolean isClosed() { return closed; } - - public boolean seek(KeyValue seekKv) { - while(iter.hasNext()){ - KeyValue next = iter.next(); - int ret = KeyValue.COMPARATOR.compare(next, seekKv); - if(ret >= 0){ - current = next; - return true; - } - } - return false; - } - - @Override - public boolean reseek(KeyValue key) throws IOException { - return seek(key); - } - - @Override - public long getSequenceID() { - return 0; - } } } Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueScanFixture.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueScanFixture.java (revision 1161347) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueScanFixture.java (working copy) @@ -42,8 +42,6 @@ KeyValueScanner scan = new KeyValueScanFixture( KeyValue.COMPARATOR, kvs); - // test simple things. - assertNull(scan.peek()); KeyValue kv = KeyValue.createFirstOnRow(Bytes.toBytes("RowA")); // should seek to this: assertTrue(scan.seek(kv)); Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestMinVersions.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestMinVersions.java (revision 1161347) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestMinVersions.java (working copy) @@ -315,14 +315,12 @@ // now flush region.flushcache(); - region.compactStores(); - // oldest version still exists - // flushing/minor compactions can't get rid of these, anymore + // with HBASE-4241 a flush will eliminate the expired rows g = new Get(T1); g.setTimeRange(0L, ts-2); r = region.get(g, null); - checkResult(r, c0, T1); + assertTrue(r.isEmpty()); // major compaction region.compactStores(true); Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java (revision 1161347) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java (working copy) @@ -122,6 +122,9 @@ Path logdir = new Path(DIR+methodName+"/logs"); Path oldLogDir = new Path(basedir, HConstants.HREGION_OLDLOGDIR_NAME); HColumnDescriptor hcd = new HColumnDescriptor(family); + // some of the tests write 4 versions and then flush + // (with HBASE-4241, lower versions are collected on flush) + hcd.setMaxVersions(4); FileSystem fs = FileSystem.get(conf); fs.delete(logdir, true);