Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java (revision 1161205) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java (working copy) @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.HBaseTestCase; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ScannerWrapper; public class TestKeyValueHeap extends HBaseTestCase { @@ -208,33 +209,13 @@ } } - private static class Scanner implements KeyValueScanner { - private Iterator iter; - private KeyValue current; + private static class Scanner extends ScannerWrapper { private boolean closed = false; public Scanner(List list) { - Collections.sort(list, KeyValue.COMPARATOR); - iter = list.iterator(); - if(iter.hasNext()){ - current = iter.next(); - } + super(list); } - public KeyValue peek() { - return current; - } - - public KeyValue next() { - KeyValue oldCurrent = current; - if(iter.hasNext()){ - current = iter.next(); - } else { - current = null; - } - return oldCurrent; - } - public void close(){ closed = true; } @@ -242,28 +223,6 @@ public boolean isClosed() { return closed; } - - public boolean seek(KeyValue seekKv) { - while(iter.hasNext()){ - KeyValue next = iter.next(); - int ret = KeyValue.COMPARATOR.compare(next, seekKv); - if(ret >= 0){ - current = next; - return true; - } - } - return false; - } - - @Override - public boolean reseek(KeyValue key) throws IOException { - return seek(key); - } - - @Override - public long getSequenceID() { - return 0; - } } } Index: src/test/java/org/apache/hadoop/hbase/regionserver/KeyValueScanFixture.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/KeyValueScanFixture.java (revision 1161205) +++ src/test/java/org/apache/hadoop/hbase/regionserver/KeyValueScanFixture.java (working copy) @@ -21,11 +21,11 @@ package org.apache.hadoop.hbase.regionserver; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; +import org.apache.hadoop.hbase.util.ScannerWrapper; import org.apache.hadoop.hbase.KeyValue; import java.util.ArrayList; import java.util.Iterator; -import java.util.Collections; import java.util.List; /** @@ -34,7 +34,7 @@ * to the provided comparator, and then the whole thing pretends * to be a store file scanner. */ -public class KeyValueScanFixture implements KeyValueScanner { +public class KeyValueScanFixture extends ScannerWrapper { ArrayList data; Iterator iter = null; KeyValue current = null; @@ -42,13 +42,7 @@ public KeyValueScanFixture(KeyValue.KVComparator comparator, KeyValue... incData) { - this.comparator = comparator; - - data = new ArrayList(incData.length); - for( int i = 0; i < incData.length ; ++i) { - data.add(incData[i]); - } - Collections.sort(data, this.comparator); + super(incData, comparator); } public static List scanFixture(KeyValue[] ... kvArrays) { @@ -58,54 +52,4 @@ } return scanners; } - - - @Override - public KeyValue peek() { - return this.current; - } - - @Override - public KeyValue next() { - KeyValue res = current; - - if (iter.hasNext()) - current = iter.next(); - else - current = null; - return res; - } - - @Override - public boolean seek(KeyValue key) { - // start at beginning. - iter = data.iterator(); - int cmp; - KeyValue kv = null; - do { - if (!iter.hasNext()) { - current = null; - return false; - } - kv = iter.next(); - cmp = comparator.compare(key, kv); - } while (cmp > 0); - current = kv; - return true; - } - - @Override - public boolean reseek(KeyValue key) { - return seek(key); - } - - @Override - public void close() { - // noop. - } - - @Override - public long getSequenceID() { - return 0; - } } Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueScanFixture.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueScanFixture.java (revision 1161205) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueScanFixture.java (working copy) @@ -42,8 +42,6 @@ KeyValueScanner scan = new KeyValueScanFixture( KeyValue.COMPARATOR, kvs); - // test simple things. - assertNull(scan.peek()); KeyValue kv = KeyValue.createFirstOnRow(Bytes.toBytes("RowA")); // should seek to this: assertTrue(scan.seek(kv)); Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestMinVersions.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestMinVersions.java (revision 1161205) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestMinVersions.java (working copy) @@ -315,14 +315,11 @@ // now flush region.flushcache(); - region.compactStores(); - // oldest version still exists - // flushing/minor compactions can't get rid of these, anymore g = new Get(T1); g.setTimeRange(0L, ts-2); r = region.get(g, null); - checkResult(r, c0, T1); + assertTrue(r.isEmpty()); // major compaction region.compactStores(true); Index: src/main/java/org/apache/hadoop/hbase/regionserver/Store.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (revision 1161205) +++ src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (working copy) @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.NavigableSet; import java.util.SortedSet; @@ -48,6 +49,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.ScannerWrapper; import org.apache.hadoop.util.StringUtils; import com.google.common.base.Preconditions; @@ -90,6 +92,7 @@ // ttl in milliseconds. protected long ttl; protected int minVersions; + protected int maxVersions; long majorCompactionTime; private final int minFilesToCompact; private final int maxFilesToCompact; @@ -178,6 +181,7 @@ this.ttl *= 1000; } this.minVersions = family.getMinVersions(); + this.maxVersions = family.getMaxVersions(); this.memstore = new MemStore(conf, this.comparator); this.storeNameStr = Bytes.toString(this.family.getName()); @@ -481,24 +485,25 @@ if (set.size() == 0) { return null; } - long oldestTimestamp = System.currentTimeMillis() - ttl; // TODO: We can fail in the below block before we complete adding this // flush to list of store files. Add cleanup of anything put on filesystem // if we fail. + Scan scan = new Scan(); + scan.setMaxVersions(maxVersions); + InternalScanner scanner = new StoreScanner(this, scan, Collections.singletonList(new ScannerWrapper(set)), false); synchronized (flushLock) { status.setStatus("Flushing " + this + ": creating writer"); // A. Write the map out to the disk writer = createWriterInTmp(set.size()); writer.setTimeRangeTracker(snapshotTimeRangeTracker); try { - for (KeyValue kv: set) { - // If minVersion > 0 we will wait until the next compaction to - // collect expired KVs. (following the logic for maxVersions). - // TODO: As Jonathan Gray points this can be optimized - // (see HBASE-4241) - if (minVersions > 0 || !isExpired(kv, oldestTimestamp)) { - writer.append(kv); - flushed += this.memstore.heapSizeChange(kv, true); + ArrayList kvs = new ArrayList(); + while (scanner.next(kvs)) { + if (!kvs.isEmpty()) { + for (KeyValue kv : kvs) { + writer.append(kv); + flushed += this.memstore.heapSizeChange(kv, true); + } } } } finally { Index: src/main/java/org/apache/hadoop/hbase/util/ScannerWrapper.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/util/ScannerWrapper.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/util/ScannerWrapper.java (revision 0) @@ -0,0 +1,122 @@ +/* + * Copyright 2009 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.SortedSet; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.regionserver.KeyValueScanner; + +/** + * Utility scanner that can wrap a collection and pretend to + * be a KeyValueScanner. + */ +public class ScannerWrapper implements KeyValueScanner { + final private Iterable data; + final KeyValue.KVComparator comparator; + private Iterator iter; + private KeyValue current; + + public ScannerWrapper(SortedSet set) { + comparator = KeyValue.COMPARATOR; + data = set; + init(); + } + + public ScannerWrapper(List list) { + this(list, KeyValue.COMPARATOR); + } + + public ScannerWrapper(List list, KeyValue.KVComparator comparator) { + Collections.sort(list, comparator); + this.comparator = comparator; + data = list; + init(); + } + + public ScannerWrapper(KeyValue array[], KeyValue.KVComparator comparator) { + this.comparator = comparator; + + List tmp = new ArrayList(array.length); + for( int i = 0; i < array.length ; ++i) { + tmp.add(array[i]); + } + Collections.sort(tmp, comparator); + data = tmp; + init(); + } + + private void init() { + iter = data.iterator(); + if(iter.hasNext()){ + current = iter.next(); + } + } + + @Override + public KeyValue peek() { + return current; + } + + @Override + public KeyValue next() { + KeyValue oldCurrent = current; + if(iter.hasNext()){ + current = iter.next(); + } else { + current = null; + } + return oldCurrent; + } + + @Override + public boolean seek(KeyValue seekKv) { + // restart iterator + iter = data.iterator(); + return reseek(seekKv); + } + + @Override + public boolean reseek(KeyValue seekKv) { + while(iter.hasNext()){ + KeyValue next = iter.next(); + int ret = comparator.compare(next, seekKv); + if(ret >= 0){ + current = next; + return true; + } + } + return false; + } + + @Override + public long getSequenceID() { + return 0; + } + + @Override + public void close() { + // do nothing + } +}