diff --git a/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java index 7579559..d6a487f 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -58,7 +58,7 @@ public class MemStore implements HeapSize { private final long ttl; - // MemStore. Use a SkipListMap rather than SkipListSet because of the + // MemStore. Use a KeyValueSkipListSet rather than SkipListSet because of the // better semantics. The Map will overwrite if passed a key it already had // whereas the Set will not add new KV if key is same though value might be // different. Value is not important -- just make sure always same @@ -575,7 +575,7 @@ public class MemStore implements HeapSize { * @return true if done with store (early-out), false if not * @throws IOException */ - private boolean internalGet(final NavigableSet set, + boolean internalGet(final NavigableSet set, final QueryMatcher matcher, final List result) throws IOException { if(set.isEmpty()) return false; diff --git a/src/java/org/apache/hadoop/hbase/regionserver/QueryMatcher.java b/src/java/org/apache/hadoop/hbase/regionserver/QueryMatcher.java index 7602259..9e9295d 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/QueryMatcher.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/QueryMatcher.java @@ -279,9 +279,9 @@ public class QueryMatcher { MatchCode mc = columns.checkColumn(bytes, columnOffset, columnLength); if (mc == MatchCode.INCLUDE && this.filter != null) { switch(this.filter.filterKeyValue(kv)) { - case INCLUDE: return MatchCode.INCLUDE; - case SKIP: return MatchCode.SKIP; - default: return MatchCode.DONE; + case INCLUDE: return MatchCode.INCLUDE; + case SKIP: return MatchCode.SKIP; + default: return MatchCode.DONE; } } return mc; diff --git a/src/java/org/apache/hadoop/hbase/regionserver/Store.java b/src/java/org/apache/hadoop/hbase/regionserver/Store.java index 4568bcd..a0c4bff 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -1607,22 +1607,46 @@ public class Store implements HConstants, HeapSize { // Setting up the QueryMatcher Get get = new Get(row); - NavigableSet qualifiers = + NavigableSet qualifiers = new TreeSet(Bytes.BYTES_COMPARATOR); qualifiers.add(qualifier); QueryMatcher matcher = new QueryMatcher(get, f, qualifiers, this.ttl, keyComparator, 1); - - // Read from memstore - if (this.memstore.get(matcher, result)) { + + boolean newTs = true; + KeyValue kv = null; + // Read from memstore first: + this.memstore.internalGet(this.memstore.kvset, + matcher, result); + if (!result.isEmpty()) { + kv = result.get(0).clone(); + newTs = false; + } else { + // try the snapshot. + this.memstore.internalGet(this.memstore.snapshot, + matcher, result); + if (!result.isEmpty()) { + kv = result.get(0).clone(); + } + } + + if (kv != null) { // Received early-out from memstore // Make a copy of the KV and increment it - KeyValue kv = result.get(0).clone(); byte [] buffer = kv.getBuffer(); int valueOffset = kv.getValueOffset(); value = Bytes.toLong(buffer, valueOffset, Bytes.SIZEOF_LONG) + amount; Bytes.putBytes(buffer, valueOffset, Bytes.toBytes(value), 0, Bytes.SIZEOF_LONG); + if (newTs) { + long currTs = System.currentTimeMillis(); + if (currTs == kv.getTimestamp()) { + currTs++; // just in case something wacky happens. + } + byte [] stampBytes = Bytes.toBytes(currTs); + Bytes.putBytes(buffer, kv.getTimestampOffset(), stampBytes, 0, + Bytes.SIZEOF_LONG); + } return new ICVResult(value, 0, kv); } // Check if we even have storefiles diff --git a/src/test/org/apache/hadoop/hbase/regionserver/TestStore.java b/src/test/org/apache/hadoop/hbase/regionserver/TestStore.java index 87faf1e..6b9023d 100644 --- a/src/test/org/apache/hadoop/hbase/regionserver/TestStore.java +++ b/src/test/org/apache/hadoop/hbase/regionserver/TestStore.java @@ -1,15 +1,6 @@ package org.apache.hadoop.hbase.regionserver; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.NavigableSet; -import java.util.concurrent.ConcurrentSkipListSet; - import junit.framework.TestCase; - import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -22,6 +13,15 @@ import org.apache.hadoop.hbase.io.hfile.HFile.Writer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.util.Progressable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NavigableSet; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentSkipListSet; + /** * Test class fosr the Store */ @@ -242,7 +242,7 @@ public class TestStore extends TestCase { this.store.add(new KeyValue(row, family, qf1, Bytes.toBytes(value))); Store.ICVResult vas = this.store.incrementColumnValue(row, family, qf1, amount); - assertEquals(vas.value, value+amount); + assertEquals(value+amount, vas.value); store.add(vas.kv); Get get = new Get(row); get.addColumn(family, qf1); @@ -361,5 +361,45 @@ public class TestStore extends TestCase { this.store.get(get, qualifiers, result); assertEquals(amount, Bytes.toLong(result.get(0).getValue())); } + + public void testIncrementColumnValue_ICVDuringFlush() + throws IOException { + init(this.getName()); + + long value = 1L; + long amount = 3L; + this.store.add(new KeyValue(row, family, qf1, + System.currentTimeMillis(), + Bytes.toBytes(value))); + + // snapshot the store. + this.store.snapshot(); + + // incrment during the snapshot... + + Store.ICVResult vas = this.store.incrementColumnValue(row, family, qf1, amount); + + // then flush. + this.store.flushCache(id++); + assertEquals(1, this.store.getStorefiles().size()); + assertEquals(0, this.store.memstore.kvset.size()); + + Get get = new Get(row); + get.addColumn(family, qf1); + get.setMaxVersions(); // all versions. + List results = new ArrayList(); + + NavigableSet cols = new TreeSet(); + cols.add(qf1); + + this.store.get(get, cols, results); + // only one, because Store.ICV doesnt add to memcache. + assertEquals(1, results.size()); + + // but the timestamps should be different... + long icvTs = vas.kv.getTimestamp(); + long storeTs = results.get(0).getTimestamp(); + assertTrue(icvTs != storeTs); + } }