diff --git a/oak-core/pom.xml b/oak-core/pom.xml
index 38c9ae4..3fe9f38 100644
--- a/oak-core/pom.xml
+++ b/oak-core/pom.xml
@@ -232,6 +232,19 @@
       <artifactId>slf4j-api</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>com.esotericsoftware.kryo</groupId>
+      <artifactId>kryo</artifactId>
+      <version>2.21</version>
+      <optional>true</optional>
+    </dependency>
+    <dependency>
+       <groupId>org.apache.directmemory</groupId>
+       <artifactId>directmemory-cache</artifactId>
+       <version>0.2-SNAPSHOT</version>
+       <optional>true</optional>
+    </dependency>
+
     <!-- Findbugs annotations -->
     <dependency>
       <groupId>com.google.code.findbugs</groupId>
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/cache/CacheWrapperFactory.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/cache/CacheWrapperFactory.java
new file mode 100644
index 0000000..86b655e
--- /dev/null
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/cache/CacheWrapperFactory.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.cache;
+
+import com.google.common.cache.Cache;
+
+public interface CacheWrapperFactory<V> {
+
+    Cache<String,V> wrap(Cache<String,V> cache,ForwardingListener<String,V> listener);
+
+}
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/cache/DirectMemoryCache.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/cache/DirectMemoryCache.java
new file mode 100644
index 0000000..6742d65
--- /dev/null
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/cache/DirectMemoryCache.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.cache;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.cache.*;
+import com.google.common.cache.CacheStats;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.directmemory.cache.CacheService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static com.google.common.cache.AbstractCache.SimpleStatsCounter;
+import static com.google.common.cache.AbstractCache.StatsCounter;
+
+public class DirectMemoryCache<V> extends ForwardingCache.SimpleForwardingCache<String,V> implements RemovalListener<String,V>{
+    private static final AtomicLong COUNTER = new AtomicLong();
+
+    private Logger logger = LoggerFactory.getLogger(getClass());
+
+    /**
+     * As the key names used are path elements the in memory map maintained by DirectMemory
+     * consumes lot more memory
+     */
+    private static final int MAX_ELEMENTS_OFF_HEAP = 500000;
+
+    private final AtomicInteger ai = new AtomicInteger();
+
+    /**
+     * Experimental feature in which digest of key is stored instead of actual key
+     */
+    private final boolean HASH_KEY = true;
+
+    /**
+     * It is used to partition key space to use same off heap cache
+     * shared between multiple Guava cache
+     */
+    private final String prefix = "" + COUNTER.incrementAndGet() + "_";
+    private final CacheService<String,Object> offHeapCache;
+    private final StatsCounter statsCounter = new SimpleStatsCounter();
+
+    public DirectMemoryCache(CacheService<String, Object> offHeapCache, Cache<String, V> cache) {
+        super(cache);
+        this.offHeapCache = offHeapCache;
+    }
+
+    @Nullable
+    @Override
+    public V getIfPresent(Object key) {
+        V result = super.getIfPresent(key);
+        if(result == null){
+            result = retrieve(key);
+        }
+        return result;
+    }
+
+
+    @Override
+    public V get(final String key, final Callable<? extends V> valueLoader) throws ExecutionException {
+        return super.get(key, new Callable<V>() {
+            @Override
+            public V call() throws Exception {
+                //Check in offHeap first
+                V result = retrieve(key);
+
+                //Not found in L2 then load
+                if(result == null){
+                    result = valueLoader.call();
+                }
+                return result;
+            }
+        });
+    }
+
+    @Override
+    public ImmutableMap<String, V> getAllPresent(Iterable<?> keys) {
+        List<?> list = Lists.newArrayList(keys);
+        ImmutableMap<String, V> result = super.getAllPresent(list);
+
+        //All the requested keys found then no
+        //need to check L2
+        if(result.size() == list.size()){
+            return result;
+        }
+
+        //Look up value from L2
+        Map<String,V> r2 = Maps.newHashMap(result);
+        for(Object key : list){
+            if(!result.containsKey(key)){
+                V val = retrieve(key);
+                if(val != null){
+                    r2.put((String) key,val);
+                }
+            }
+        }
+        return ImmutableMap.copyOf(r2);
+    }
+
+    @Override
+    public void invalidate(Object key) {
+        super.invalidate(key);
+        offHeapCache.free(prepareKey(key));
+    }
+
+    @Override
+    public void invalidateAll(Iterable<?> keys) {
+        super.invalidateAll(keys);
+        for(Object key : keys){
+            offHeapCache.free(prepareKey(key));
+        }
+    }
+
+    @Override
+    public void invalidateAll() {
+        super.invalidateAll();
+        //Look for keys which are part of this map and free them
+        for(String key : offHeapCache.getMap().keySet()){
+            if(currentCacheKey(key)){
+                offHeapCache.free(key);
+            }
+        }
+    }
+
+    @Override
+    public void onRemoval(RemovalNotification<String, V> notification) {
+        if(notification.getCause() == RemovalCause.SIZE){
+            if(offHeapCache.entries() > MAX_ELEMENTS_OFF_HEAP){
+                if(ai.incrementAndGet() % 100 == 0){
+                    logger.warn("Number of entries in off heap cache is exceeding the limit {}. ",MAX_ELEMENTS_OFF_HEAP);
+                }
+                return;
+            }
+
+            putInternal(notification.getKey(),notification.getValue());
+        }
+    }
+
+    private void putInternal(String key,V value){
+        String preparedKey = prepareKey(key);
+        Object preparedValue =  prepareValue(key,value);
+        offHeapCache.put(preparedKey,preparedValue);
+
+    }
+
+    public CacheStats offHeapStats(){
+        return statsCounter.snapshot();
+    }
+
+    private V retrieve(Object key) {
+        Stopwatch watch = new Stopwatch().start();
+
+        Object value =  offHeapCache.retrieve(prepareKey(key));
+        if(value instanceof KeyAwareValue){
+
+            //Check that key is actually the same as one associated with cache
+            //value as its possible that two key hash to same hashKey
+            if (((KeyAwareValue) value).actualKey.equals(key)) {
+                value = ((KeyAwareValue) value).value;
+            } else{
+                //Keys does not match due to hash collision
+                //nullify the value
+                value = null;
+            }
+        }
+
+        if(value != null){
+            statsCounter.recordLoadSuccess(watch.elapsed(TimeUnit.NANOSECONDS));
+        }else{
+            statsCounter.recordMisses(1);
+        }
+
+        return (V)value;
+    }
+
+    private String prepareKey(Object key){
+        if(HASH_KEY && key instanceof String){
+            key = digest((String)key);
+        }
+        return prefix + key;
+    }
+
+    private Object prepareValue(String actualKey, V value) {
+        if(HASH_KEY){
+            return new KeyAwareValue(actualKey,value);
+        }
+        return value;
+    }
+
+    private static String digest(String key) {
+        byte[] hash = DigestUtils.sha256(key);
+        return Base64.encodeBase64URLSafeString(hash);
+    }
+
+    private boolean currentCacheKey(String key) {
+        return key.startsWith(prefix);
+    }
+
+    private static class KeyAwareValue<V> {
+        V value;
+        String actualKey;
+
+        public KeyAwareValue() {
+            //For serialization
+        }
+
+        private KeyAwareValue(String actualKey, V value) {
+            this.value = value;
+            this.actualKey = actualKey;
+        }
+    }
+}
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/cache/DirectMemoryCacheWrapperFactory.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/cache/DirectMemoryCacheWrapperFactory.java
new file mode 100644
index 0000000..4e54da8
--- /dev/null
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/cache/DirectMemoryCacheWrapperFactory.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.cache;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.cache.Cache;
+import com.mongodb.BasicDBObject;
+import org.apache.directmemory.DirectMemory;
+import org.apache.directmemory.cache.CacheService;
+import org.apache.directmemory.measures.In;
+import org.apache.directmemory.measures.Ram;
+import org.apache.jackrabbit.oak.plugins.mongomk.MongoDocumentStore;
+import org.apache.jackrabbit.oak.plugins.mongomk.Node;
+import org.apache.jackrabbit.oak.plugins.mongomk.Serializers;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class DirectMemoryCacheWrapperFactory<V> implements CacheWrapperFactory<V>, Closeable {
+    private final int BUFFER_SIZE = Ram.Gb(1);
+    private final CacheService<String, Object> cacheService;
+    private final KryoSerializer serializer = new KryoSerializer();
+
+    public DirectMemoryCacheWrapperFactory(long size) {
+        int noOfBuffers = Math.max(1,(int) (size / BUFFER_SIZE));
+        int buffSize = (int) Math.min(size,BUFFER_SIZE);
+        cacheService = new DirectMemory<String, Object>()
+                .setNumberOfBuffers(noOfBuffers)
+                .setSize(buffSize)
+                .setSerializer(serializer)
+                .setDisposalTime(In.minutes(5))
+                .newCacheService();
+    }
+
+    @Override
+    public Cache<String, V> wrap(Cache<String, V> cache, ForwardingListener<String, V> listener) {
+        DirectMemoryCache<V> dmc = new DirectMemoryCache<V>(cacheService, cache);
+        listener.setDelegate(dmc);
+        return dmc;
+    }
+
+    @Override
+    public void close() throws IOException {
+        //The 0.1-incubator class is not implementing Closeable but
+        //0.2-SNAPSHOT does
+        if (cacheService instanceof Closeable) {
+            ((Closeable) cacheService).close();
+        }
+
+        serializer.close();
+    }
+
+    private static final class KryoSerializer
+            implements org.apache.directmemory.serialization.Serializer {
+        //Kryo class is not thread safe so using a pool
+        private final KryoPool pool = new KryoPool();
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override
+        public <T> byte[] serialize(T obj)
+                throws IOException {
+            Class<?> clazz = obj.getClass();
+
+            KryoHolder kh = null;
+            try {
+                kh = pool.get();
+                kh.reset();
+                checkRegiterNeeded(kh.kryo, clazz);
+
+                kh.kryo.writeObject(kh.output, obj);
+                return kh.output.toBytes();
+            } finally {
+                if (kh != null) {
+                    pool.done(kh);
+                }
+            }
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override
+        public <T> T deserialize(byte[] source, Class<T> clazz)
+                throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException {
+            KryoHolder kh = null;
+            try {
+                kh = pool.get();
+                checkRegiterNeeded(kh.kryo, clazz);
+
+                Input input = new Input(source);
+                return kh.kryo.readObject(input, clazz);
+            }finally {
+                if (kh != null) {
+                    pool.done(kh);
+                }
+            }
+        }
+
+        public void close() {
+            pool.close();
+        }
+
+        private void checkRegiterNeeded(Kryo kryo, Class<?> clazz) {
+            kryo.register(clazz);
+        }
+
+    }
+
+    private static class KryoHolder {
+        private static final int BUFFER_SIZE = 1024;
+        final Kryo kryo;
+        final Output output = new Output(BUFFER_SIZE, -1);
+
+        private KryoHolder() {
+            kryo = new Kryo();
+            kryo.setReferences(false);
+            kryo.setAutoReset(true);
+            //kryo.setRegistrationRequired(true);
+
+            kryo.register(BasicDBObject.class,Serializers.BASIC_DB_OBJECT);
+            kryo.register(Node.class, Serializers.NODE);
+            kryo.register(Node.Children.class, Serializers.CHILDREN);
+            kryo.register(MongoDocumentStore.CachedDocument.class, Serializers.DOCUMENTS);
+
+            kryo.setClassLoader(getClass().getClassLoader());
+        }
+
+        private void reset(){
+            output.clear();
+        }
+    }
+
+    private static class KryoPool  {
+
+        private final Queue<KryoHolder> objects = new ConcurrentLinkedQueue<KryoHolder>();
+
+        public KryoHolder get(){
+            KryoHolder kh;
+            if((kh = objects.poll()) == null){
+                kh =  new KryoHolder();
+            }
+            return kh;
+        }
+
+        public void done(KryoHolder kh){
+            objects.offer(kh);
+        }
+
+        public void close() {
+            objects.clear();
+        }
+    }
+
+}
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/cache/ForwardingListener.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/cache/ForwardingListener.java
new file mode 100644
index 0000000..e3bd0f3
--- /dev/null
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/cache/ForwardingListener.java
@@ -0,0 +1,19 @@
+package org.apache.jackrabbit.oak.cache;
+
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+
+public class ForwardingListener<K,V> implements RemovalListener<K,V>{
+    private RemovalListener<K,V> delegate;
+
+    @Override
+    public void onRemoval(RemovalNotification<K, V> notification) {
+        if(delegate != null){
+            delegate.onRemoval(notification);
+        }
+    }
+
+    public void setDelegate(RemovalListener<K, V> delegate) {
+        this.delegate = delegate;
+    }
+}
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoDocumentStore.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoDocumentStore.java
index a5ca228..011dc79 100644
--- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoDocumentStore.java
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoDocumentStore.java
@@ -463,13 +463,19 @@ public class MongoDocumentStore implements DocumentStore {
     /**
      * A cache entry.
      */
-    static class CachedDocument implements CacheValue {
+    public static class CachedDocument implements CacheValue {
         
-        final long time = System.currentTimeMillis();
+        final long time;
         final Map<String, Object> value;
         
         CachedDocument(Map<String, Object> value) {
+            this(value, System.currentTimeMillis());
+
+        }
+
+        CachedDocument(Map<String, Object> value,long time) {
             this.value = value;
+            this.time = time;
         }
         
         @Override
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java
index ff62e10..4e4e011 100644
--- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java
@@ -47,10 +47,7 @@ import org.apache.jackrabbit.mk.json.JsopReader;
 import org.apache.jackrabbit.mk.json.JsopStream;
 import org.apache.jackrabbit.mk.json.JsopTokenizer;
 import org.apache.jackrabbit.mk.json.JsopWriter;
-import org.apache.jackrabbit.oak.cache.CacheLIRS;
-import org.apache.jackrabbit.oak.cache.CacheStats;
-import org.apache.jackrabbit.oak.cache.CacheValue;
-import org.apache.jackrabbit.oak.cache.EmpiricalWeigher;
+import org.apache.jackrabbit.oak.cache.*;
 import org.apache.jackrabbit.oak.commons.PathUtils;
 import org.apache.jackrabbit.oak.plugins.mongomk.DocumentStore.Collection;
 import org.apache.jackrabbit.oak.plugins.mongomk.Node.Children;
@@ -1677,6 +1674,7 @@ public class MongoMK implements MicroKernel {
         private long childrenCacheSize;
         private long diffCacheSize;
         private long documentCacheSize;
+        private CacheWrapperFactory cacheWrapperFactory;
 
         public Builder() {
             memoryCacheSize(DEFAULT_MEMORY_CACHE_SIZE);
@@ -1783,11 +1781,16 @@ public class MongoMK implements MicroKernel {
             return weigher;
         }
 
-        public Builder withWeigher(Weigher<String, CacheValue> weigher) {
+        public Builder with(Weigher<String, CacheValue> weigher) {
             this.weigher = weigher;
             return this;
         }
 
+        public Builder with(CacheWrapperFactory factory){
+            this.cacheWrapperFactory = factory;
+            return this;
+        }
+
         public Builder memoryCacheSize(long memoryCacheSize) {
             this.nodeCacheSize = memoryCacheSize * 20 / 100;
             this.childrenCacheSize = memoryCacheSize * 10 / 100;
@@ -1833,8 +1836,15 @@ public class MongoMK implements MicroKernel {
                 return CacheLIRS.newBuilder().weigher(weigher).
                         maximumWeight(maxWeight).recordStats().build();
             }
-            return CacheBuilder.newBuilder().weigher(weigher).
-                    maximumWeight(maxWeight).recordStats().build();
+            CacheBuilder<String,CacheValue> builder = CacheBuilder.newBuilder().<String,CacheValue>weigher(weigher).
+                    maximumWeight(maxWeight).recordStats();
+
+            if(cacheWrapperFactory != null){
+                ForwardingListener listener = new ForwardingListener();
+                return cacheWrapperFactory.wrap(builder.removalListener(listener).build(),listener);
+            }
+
+            return builder.build();
         }
     }
 
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMicroKernelService.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMicroKernelService.java
index 9c32f37..963b40c 100644
--- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMicroKernelService.java
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMicroKernelService.java
@@ -30,6 +30,7 @@ import org.apache.felix.scr.annotations.ConfigurationPolicy;
 import org.apache.felix.scr.annotations.Property;
 import org.apache.jackrabbit.mk.api.MicroKernel;
 import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean;
+import org.apache.jackrabbit.oak.cache.DirectMemoryCacheWrapperFactory;
 import org.apache.jackrabbit.oak.plugins.mongomk.util.MongoConnection;
 import org.apache.jackrabbit.oak.spi.whiteboard.OsgiWhiteboard;
 import org.apache.jackrabbit.oak.spi.whiteboard.Registration;
@@ -59,6 +60,7 @@ public class MongoMicroKernelService {
     private static final int DEFAULT_PORT = 27017;
     private static final String DEFAULT_DB = "oak";
     private static final int DEFAULT_CACHE = 256;
+    private static final int DEFAULT_OFF_HEAP = -1;
 
     @Property(value = DEFAULT_HOST)
     private static final String PROP_HOST = "host";
@@ -71,6 +73,10 @@ public class MongoMicroKernelService {
 
     @Property(intValue = DEFAULT_CACHE)
     private static final String PROP_CACHE = "cache";
+
+    @Property(intValue = DEFAULT_OFF_HEAP)
+    private static final String PROP_OFF_HEAP = "offheap";
+
     private static final long MB = 1024 * 1024;
 
     private final Logger logger = LoggerFactory.getLogger(this.getClass());
@@ -86,6 +92,7 @@ public class MongoMicroKernelService {
         int port = PropertiesUtil.toInteger(config.get(PROP_PORT), DEFAULT_PORT);
         String db = PropertiesUtil.toString(config.get(PROP_DB), DEFAULT_DB);
         int cacheSize = PropertiesUtil.toInteger(config.get(PROP_CACHE), DEFAULT_CACHE);
+        int offHeap = PropertiesUtil.toInteger(config.get(PROP_OFF_HEAP), DEFAULT_OFF_HEAP);
 
         logger.info("Starting MongoDB MicroKernel with host={}, port={}, db={}",
                 new Object[] {host, port, db});
@@ -95,10 +102,18 @@ public class MongoMicroKernelService {
 
         logger.info("Connected to database {}", mongoDB);
 
-        mk = new MongoMK.Builder()
-                        .memoryCacheSize(cacheSize * MB)
-                        .setMongoDB(mongoDB)
-                        .open();
+        MongoMK.Builder mkbuilder = new MongoMK.Builder();
+
+        if(offHeap != -1){
+            logger.info("Off heap cache support enabled with {} MB",offHeap);
+            mkbuilder.with(new DirectMemoryCacheWrapperFactory(offHeap * MB));
+        }
+
+        //Order of calls is important
+        mkbuilder.memoryCacheSize(cacheSize * MB)
+                .setMongoDB(mongoDB);
+
+        mk = mkbuilder.open();
 
         registerJMXBeans(mk, context);
 
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Node.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Node.java
index 1dd921f..f4cf619 100644
--- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Node.java
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Node.java
@@ -34,7 +34,7 @@ public class Node implements CacheValue {
     final Revision rev;
     final Map<String, String> properties = Utils.newMap();
     Revision lastRevision;
-    
+
     Node(String path, Revision rev) {
         this.path = path;
         this.rev = rev;
@@ -115,11 +115,10 @@ public class Node implements CacheValue {
     /**
      * A list of children for a node.
      */
-    static class Children implements CacheValue {
+    public static class Children implements CacheValue {
 
         final ArrayList<String> children = new ArrayList<String>();
         boolean hasMore;
-        long offset;
 
         @Override
         public int getMemory() {
@@ -134,7 +133,6 @@ public class Node implements CacheValue {
         public String toString() {
             return children.toString();
         }
-        
     }
 
 }
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Serializers.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Serializers.java
new file mode 100644
index 0000000..ea8c644
--- /dev/null
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Serializers.java
@@ -0,0 +1,157 @@
+package org.apache.jackrabbit.oak.plugins.mongomk;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.serializers.CollectionSerializer;
+import com.esotericsoftware.kryo.serializers.MapSerializer;
+import com.mongodb.BasicDBObject;
+
+import java.util.ArrayList;
+import java.util.Map;
+
+public class Serializers {
+    public static final Class[] PROPERTIES_GENERIC = new Class[]{String.class, String.class};
+
+    public static Serializer<Node> NODE = new Serializer<Node>() {
+
+        @Override
+        public void write(Kryo kryo, Output output, Node n) {
+            output.writeString(n.rev.toString());
+            output.writeString(n.path);
+
+            String lastRev = (n.lastRevision != null) ? n.lastRevision.toString() : null;
+            output.writeString(lastRev);
+
+
+            kryo.writeClass(output,n.properties.getClass());
+            MapSerializer ms = createSerializer(kryo);
+            ms.write(kryo, output, n.properties);
+        }
+
+        @Override
+        public Node read(Kryo kryo, Input input, Class<Node> type) {
+            Revision rev = Revision.fromString(input.readString());
+            String path = input.readString();
+            Node n = new Node(path,rev);
+
+            String lastRev = input.readString();
+            if(lastRev != null){
+                n.setLastRevision(Revision.fromString(lastRev));
+            }
+
+            Class mapClass = kryo.readClass(input).getType();
+            MapSerializer ms = createSerializer(kryo);
+            n.properties.putAll(ms.read(kryo,input,mapClass));
+            return n;
+        }
+
+        private MapSerializer createSerializer(Kryo kryo) {
+            MapSerializer ms = new MapSerializer();
+            ms.setKeysCanBeNull(false);
+            ms.setGenerics(kryo, PROPERTIES_GENERIC);
+            return ms;
+        }
+    };
+
+
+    public static Serializer<Node.Children> CHILDREN = new Serializer<Node.Children>() {
+        @Override
+        public void write(Kryo kryo, Output output, Node.Children c) {
+            output.writeBoolean(c.hasMore);
+
+            CollectionSerializer cs = createSerializer(kryo);
+            cs.write(kryo, output, c.children);
+        }
+
+        @Override
+        public Node.Children read(Kryo kryo, Input input, Class<Node.Children> type) {
+            Node.Children c = new Node.Children();
+            c.hasMore = input.readBoolean();
+
+            CollectionSerializer cs = createSerializer(kryo);
+            //the read method generic type is not correct. So assign it via local var
+            Class clazz = ArrayList.class;
+            c.children.addAll(cs.read(kryo,input,clazz));
+            return c;
+        }
+
+        private CollectionSerializer createSerializer(Kryo kryo) {
+            CollectionSerializer cs = new CollectionSerializer();
+            cs.setGenerics(kryo,new Class[] {String.class});
+            cs.setElementsCanBeNull(false);
+            return cs;
+        }
+    };
+
+    public static Serializer<MongoDocumentStore.CachedDocument> DOCUMENTS = new Serializer<MongoDocumentStore.CachedDocument>() {
+        @Override
+        public void write(Kryo kryo, Output output, MongoDocumentStore.CachedDocument d) {
+            output.writeLong(d.time);
+
+            //Value can be null so need to handle it accordingly
+            Map<String, Object> value = d.value;
+            if(value != null){
+                output.writeBoolean(true);
+                kryo.writeClass(output,d.value.getClass());
+
+                MapSerializer ms = createSerializer(kryo);
+                ms.write(kryo,output,d.value);
+            }else{
+                output.writeBoolean(false);
+            }
+        }
+
+        @Override
+        public MongoDocumentStore.CachedDocument read(Kryo kryo, Input input, Class<MongoDocumentStore.CachedDocument> type) {
+            long time = input.readLong();
+
+            MongoDocumentStore.CachedDocument d = null;
+            boolean valueNotNull = input.readBoolean();
+            if(valueNotNull){
+                Class mapType = kryo.readClass(input).getType();
+
+                MapSerializer ms = createSerializer(kryo);
+                Map<String,Object> data = ms.read(kryo,input,mapType);
+                d = new MongoDocumentStore.CachedDocument(data,time);
+            }else{
+                d = new MongoDocumentStore.CachedDocument(null,time);
+            }
+            return d;
+        }
+
+        private MapSerializer createSerializer(Kryo kryo) {
+            MapSerializer ms = new MapSerializer();
+            ms.setKeysCanBeNull(false);
+            ms.setKeyClass(String.class,kryo.getSerializer(String.class));
+            return ms;
+        }
+    };
+
+    public static Serializer BASIC_DB_OBJECT = new MapSerializer(){
+        @Override
+        public void write(Kryo kryo, Output output, Map map) {
+            output.writeBoolean(((BasicDBObject)map).isPartialObject());
+            super.write(kryo, output, map);
+        }
+
+        @Override
+        protected Map create(Kryo kryo, Input input, Class<Map> type) {
+            BasicDBObject bdo = new BasicDBObject();
+            if(input.readBoolean()){
+                bdo.markAsPartialObject();
+            }
+            return bdo;
+        }
+
+        @Override
+        protected Map createCopy(Kryo kryo, Map original) {
+            BasicDBObject bdo = new BasicDBObject();
+            if(original instanceof BasicDBObject && ((BasicDBObject) original).isPartialObject()){
+                bdo.markAsPartialObject();
+            }
+            return bdo;
+        }
+    };
+}
diff --git a/oak-core/src/main/resources/OSGI-INF/metatype/metatype.properties b/oak-core/src/main/resources/OSGI-INF/metatype/metatype.properties
index 1c5a3b2..4b002e0 100644
--- a/oak-core/src/main/resources/OSGI-INF/metatype/metatype.properties
+++ b/oak-core/src/main/resources/OSGI-INF/metatype/metatype.properties
@@ -31,4 +31,7 @@ db.name = MongoDB Database
 db.description = The database to use.
 
 cache.name = Cache Size (MB)
-cache.description = Cache Size (MB)
\ No newline at end of file
+cache.description = Cache Size (MB)
+
+offheap.name = OffHeap Cache Size (MB)
+offheap.description = Off Heap Cache Size (MB)
\ No newline at end of file
