diff --git a/pom.xml b/pom.xml index 7a5bdf4..ae68c28 100644 --- a/pom.xml +++ b/pom.xml @@ -1324,6 +1324,12 @@ 2.3.1 + + org.mapdb + mapdb + 1.0.3 + + com.google.protobuf diff --git a/src/main/java/org/apache/hadoop/hbase/KeyValue.java b/src/main/java/org/apache/hadoop/hbase/KeyValue.java index 69412a2..0e9bb9c 100644 --- a/src/main/java/org/apache/hadoop/hbase/KeyValue.java +++ b/src/main/java/org/apache/hadoop/hbase/KeyValue.java @@ -22,6 +22,7 @@ package org.apache.hadoop.hbase; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.io.Serializable; import java.nio.ByteBuffer; import java.util.Comparator; import java.util.HashMap; @@ -1487,6 +1488,8 @@ public class KeyValue implements Writable, HeapSize, Cloneable { * {@link KeyValue}s. */ public static class MetaComparator extends KVComparator { + private static final long serialVersionUID = -1L; // unused + private final KeyComparator rawcomparator = new MetaKeyComparator(); public KeyComparator getRawComparator() { @@ -1505,7 +1508,8 @@ public class KeyValue implements Writable, HeapSize, Cloneable { * considered the same as far as this Comparator is concerned. * Hosts a {@link KeyComparator}. */ - public static class KVComparator implements java.util.Comparator { + public static class KVComparator implements java.util.Comparator, Serializable { + private static final long serialVersionUID = -1L; // unused private final KeyComparator rawcomparator = new KeyComparator(); /** @@ -1920,7 +1924,9 @@ public class KeyValue implements Writable, HeapSize, Cloneable { * Compare key portion of a {@link KeyValue} for keys in -ROOT- * table. */ - public static class RootKeyComparator extends MetaKeyComparator { + public static class RootKeyComparator extends MetaKeyComparator implements Serializable { + private static final long serialVersionUID = -1L; // unused + public int compareRows(byte [] left, int loffset, int llength, byte [] right, int roffset, int rlength) { // Rows look like this: .META.,ROW_FROM_META,RID @@ -1979,7 +1985,9 @@ public class KeyValue implements Writable, HeapSize, Cloneable { * Compare key portion of a {@link KeyValue} for keys in .META. * table. */ - public static class MetaKeyComparator extends KeyComparator { + public static class MetaKeyComparator extends KeyComparator implements Serializable { + private static final long serialVersionUID = -1L; // unused + public int compareRows(byte [] left, int loffset, int llength, byte [] right, int roffset, int rlength) { // LOG.info("META " + Bytes.toString(left, loffset, llength) + @@ -2050,7 +2058,8 @@ public class KeyValue implements Writable, HeapSize, Cloneable { * Compare key portion of a {@link KeyValue}. */ public static class KeyComparator - implements RawComparator, SamePrefixComparator { + implements RawComparator, SamePrefixComparator, Serializable { + private static final long serialVersionUID = -1L; // unused volatile boolean ignoreTimestamp = false; volatile boolean ignoreType = false; diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java index 8b8a5b6..2a6c0c0 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -20,15 +20,22 @@ package org.apache.hadoop.hbase.regionserver; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; import java.lang.management.ManagementFactory; import java.lang.management.RuntimeMXBean; import java.rmi.UnexpectedException; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.NavigableSet; import java.util.SortedSet; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; @@ -43,6 +50,10 @@ import org.apache.hadoop.hbase.regionserver.MemStoreLAB.Allocation; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.mapdb.BTreeKeySerializer; +import org.mapdb.DB; +import org.mapdb.DBMaker; +import org.mapdb.Serializer; /** * The MemStore holds in-memory modifications to the Store. Modifications @@ -118,8 +129,8 @@ public class MemStore implements HeapSize { this.comparatorIgnoreTimestamp = this.comparator.getComparatorIgnoringTimestamps(); this.comparatorIgnoreType = this.comparator.getComparatorIgnoringType(); - this.kvset = new KeyValueSkipListSet(c); - this.snapshot = new KeyValueSkipListSet(c); + this.kvset = createKeyValueSet(c); + this.snapshot = createKeyValueSet(c); timeRangeTracker = new TimeRangeTracker(); snapshotTimeRangeTracker = new TimeRangeTracker(); this.size = new AtomicLong(DEEP_OVERHEAD); @@ -155,7 +166,7 @@ public class MemStore implements HeapSize { if (!this.kvset.isEmpty()) { this.snapshotSize = keySize(); this.snapshot = this.kvset; - this.kvset = new KeyValueSkipListSet(this.comparator); + this.kvset = createKeyValueSet(this.comparator); this.snapshotTimeRangeTracker = this.timeRangeTracker; this.timeRangeTracker = new TimeRangeTracker(); // Reset heap to not include any keys @@ -208,12 +219,76 @@ public class MemStore implements HeapSize { // OK. Passed in snapshot is same as current snapshot. If not-empty, // create a new snapshot and let the old one go. if (!ss.isEmpty()) { - this.snapshot = new KeyValueSkipListSet(this.comparator); + this.snapshot = createKeyValueSet(this.comparator); this.snapshotTimeRangeTracker = new TimeRangeTracker(); } this.snapshotSize = 0; } + private static class KVKBTreeKeySerializer extends BTreeKeySerializer implements Serializable { + private static final long serialVersionUID = -11L; // unused + private final Comparator comparator; + public KVKBTreeKeySerializer(Comparator c) { + this.comparator = c; + } + + @Override + public void serialize(DataOutput out, int start, int end, Object[] keys) throws IOException { + byte[] previous = null; + for (int i = start; i < end; i++) { + byte[] b = ((KeyValue) keys[i]).getBuffer(); + leadingValuePackWrite(out, b, previous, 0); + previous = b; + } + } + @Override + public Object[] deserialize(DataInput in, int start, int end, int size) throws IOException { + Object[] ret = new Object[size]; + byte[] previous = null; + for (int i = start; i < end; i++) { + byte[] b = leadingValuePackRead(in, previous, 0); + if (b == null) continue; + ret[i] = new KeyValue(b); + previous = b; + } + return ret; + } + @Override + public Comparator getComparator() { + return comparator; + } + } + + private static class KVValueSerializer implements Serializer, Serializable { + private static final long serialVersionUID = -1L; // unused + public void serialize(DataOutput out, KeyValue value) throws IOException { + value.write(out); + } + public KeyValue deserialize(DataInput in, int available) throws IOException { + KeyValue kv = new KeyValue(); + kv.readFields(in); + return kv; + } + public int fixedSize() { + return -1; + } + } + private static final Serializer KV_VALUE_SERIALIZER = new KVValueSerializer(); + + KeyValueSkipListSet createKeyValueSet(KeyValue.KVComparator c) { + if (conf.getBoolean("hbase.regionserver.offheap.memstore", false)) { + DB db = DBMaker + .newMemoryDirectDB() + .transactionDisable() + .asyncWriteFlushDelay(100) + .make(); + ConcurrentNavigableMap delegate = db.createTreeMap("test") + .keySerializer(new KVKBTreeKeySerializer(c)).valueSerializer(KV_VALUE_SERIALIZER).make(); + return new KeyValueSkipListSet(delegate); + } else { + return new KeyValueSkipListSet(new ConcurrentSkipListMap(c)); + } + } /** * Write an update * @param kv