diff --git a/pom.xml b/pom.xml
index 7a5bdf4..ae68c28 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1324,6 +1324,12 @@
2.3.1
+
+ org.mapdb
+ mapdb
+ 1.0.3
+
+
com.google.protobuf
diff --git a/src/main/java/org/apache/hadoop/hbase/KeyValue.java b/src/main/java/org/apache/hadoop/hbase/KeyValue.java
index 69412a2..0e9bb9c 100644
--- a/src/main/java/org/apache/hadoop/hbase/KeyValue.java
+++ b/src/main/java/org/apache/hadoop/hbase/KeyValue.java
@@ -22,6 +22,7 @@ package org.apache.hadoop.hbase;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Comparator;
import java.util.HashMap;
@@ -1487,6 +1488,8 @@ public class KeyValue implements Writable, HeapSize, Cloneable {
* {@link KeyValue}s.
*/
public static class MetaComparator extends KVComparator {
+ private static final long serialVersionUID = -1L; // unused
+
private final KeyComparator rawcomparator = new MetaKeyComparator();
public KeyComparator getRawComparator() {
@@ -1505,7 +1508,8 @@ public class KeyValue implements Writable, HeapSize, Cloneable {
* considered the same as far as this Comparator is concerned.
* Hosts a {@link KeyComparator}.
*/
- public static class KVComparator implements java.util.Comparator {
+ public static class KVComparator implements java.util.Comparator, Serializable {
+ private static final long serialVersionUID = -1L; // unused
private final KeyComparator rawcomparator = new KeyComparator();
/**
@@ -1920,7 +1924,9 @@ public class KeyValue implements Writable, HeapSize, Cloneable {
* Compare key portion of a {@link KeyValue} for keys in -ROOT-
* table.
*/
- public static class RootKeyComparator extends MetaKeyComparator {
+ public static class RootKeyComparator extends MetaKeyComparator implements Serializable {
+ private static final long serialVersionUID = -1L; // unused
+
public int compareRows(byte [] left, int loffset, int llength,
byte [] right, int roffset, int rlength) {
// Rows look like this: .META.,ROW_FROM_META,RID
@@ -1979,7 +1985,9 @@ public class KeyValue implements Writable, HeapSize, Cloneable {
* Compare key portion of a {@link KeyValue} for keys in .META.
* table.
*/
- public static class MetaKeyComparator extends KeyComparator {
+ public static class MetaKeyComparator extends KeyComparator implements Serializable {
+ private static final long serialVersionUID = -1L; // unused
+
public int compareRows(byte [] left, int loffset, int llength,
byte [] right, int roffset, int rlength) {
// LOG.info("META " + Bytes.toString(left, loffset, llength) +
@@ -2050,7 +2058,8 @@ public class KeyValue implements Writable, HeapSize, Cloneable {
* Compare key portion of a {@link KeyValue}.
*/
public static class KeyComparator
- implements RawComparator, SamePrefixComparator {
+ implements RawComparator, SamePrefixComparator, Serializable {
+ private static final long serialVersionUID = -1L; // unused
volatile boolean ignoreTimestamp = false;
volatile boolean ignoreType = false;
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
index 8b8a5b6..2a6c0c0 100644
--- a/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
@@ -20,15 +20,22 @@
package org.apache.hadoop.hbase.regionserver;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.rmi.UnexpectedException;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableSet;
import java.util.SortedSet;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
@@ -43,6 +50,10 @@ import org.apache.hadoop.hbase.regionserver.MemStoreLAB.Allocation;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.mapdb.BTreeKeySerializer;
+import org.mapdb.DB;
+import org.mapdb.DBMaker;
+import org.mapdb.Serializer;
/**
* The MemStore holds in-memory modifications to the Store. Modifications
@@ -118,8 +129,8 @@ public class MemStore implements HeapSize {
this.comparatorIgnoreTimestamp =
this.comparator.getComparatorIgnoringTimestamps();
this.comparatorIgnoreType = this.comparator.getComparatorIgnoringType();
- this.kvset = new KeyValueSkipListSet(c);
- this.snapshot = new KeyValueSkipListSet(c);
+ this.kvset = createKeyValueSet(c);
+ this.snapshot = createKeyValueSet(c);
timeRangeTracker = new TimeRangeTracker();
snapshotTimeRangeTracker = new TimeRangeTracker();
this.size = new AtomicLong(DEEP_OVERHEAD);
@@ -155,7 +166,7 @@ public class MemStore implements HeapSize {
if (!this.kvset.isEmpty()) {
this.snapshotSize = keySize();
this.snapshot = this.kvset;
- this.kvset = new KeyValueSkipListSet(this.comparator);
+ this.kvset = createKeyValueSet(this.comparator);
this.snapshotTimeRangeTracker = this.timeRangeTracker;
this.timeRangeTracker = new TimeRangeTracker();
// Reset heap to not include any keys
@@ -208,12 +219,76 @@ public class MemStore implements HeapSize {
// OK. Passed in snapshot is same as current snapshot. If not-empty,
// create a new snapshot and let the old one go.
if (!ss.isEmpty()) {
- this.snapshot = new KeyValueSkipListSet(this.comparator);
+ this.snapshot = createKeyValueSet(this.comparator);
this.snapshotTimeRangeTracker = new TimeRangeTracker();
}
this.snapshotSize = 0;
}
+ private static class KVKBTreeKeySerializer extends BTreeKeySerializer implements Serializable {
+ private static final long serialVersionUID = -11L; // unused
+ private final Comparator comparator;
+ public KVKBTreeKeySerializer(Comparator c) {
+ this.comparator = c;
+ }
+
+ @Override
+ public void serialize(DataOutput out, int start, int end, Object[] keys) throws IOException {
+ byte[] previous = null;
+ for (int i = start; i < end; i++) {
+ byte[] b = ((KeyValue) keys[i]).getBuffer();
+ leadingValuePackWrite(out, b, previous, 0);
+ previous = b;
+ }
+ }
+ @Override
+ public Object[] deserialize(DataInput in, int start, int end, int size) throws IOException {
+ Object[] ret = new Object[size];
+ byte[] previous = null;
+ for (int i = start; i < end; i++) {
+ byte[] b = leadingValuePackRead(in, previous, 0);
+ if (b == null) continue;
+ ret[i] = new KeyValue(b);
+ previous = b;
+ }
+ return ret;
+ }
+ @Override
+ public Comparator getComparator() {
+ return comparator;
+ }
+ }
+
+ private static class KVValueSerializer implements Serializer, Serializable {
+ private static final long serialVersionUID = -1L; // unused
+ public void serialize(DataOutput out, KeyValue value) throws IOException {
+ value.write(out);
+ }
+ public KeyValue deserialize(DataInput in, int available) throws IOException {
+ KeyValue kv = new KeyValue();
+ kv.readFields(in);
+ return kv;
+ }
+ public int fixedSize() {
+ return -1;
+ }
+ }
+ private static final Serializer KV_VALUE_SERIALIZER = new KVValueSerializer();
+
+ KeyValueSkipListSet createKeyValueSet(KeyValue.KVComparator c) {
+ if (conf.getBoolean("hbase.regionserver.offheap.memstore", false)) {
+ DB db = DBMaker
+ .newMemoryDirectDB()
+ .transactionDisable()
+ .asyncWriteFlushDelay(100)
+ .make();
+ ConcurrentNavigableMap delegate = db.createTreeMap("test")
+ .keySerializer(new KVKBTreeKeySerializer(c)).valueSerializer(KV_VALUE_SERIALIZER).make();
+ return new KeyValueSkipListSet(delegate);
+ } else {
+ return new KeyValueSkipListSet(new ConcurrentSkipListMap(c));
+ }
+ }
/**
* Write an update
* @param kv