diff --git oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMap.java oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMap.java
index bbf5d30..c0d2b88 100644
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMap.java
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMap.java
@@ -149,6 +149,29 @@ public class CompactionMap {
         }
     }
 
+    Map<RecordId, RecordId> getReferencedRecords(SegmentId segmentId){
+        Map<RecordId, RecordId> recIds = newHashMap();
+
+        long msb = segmentId.getMostSignificantBits();
+        long lsb = segmentId.getLeastSignificantBits();
+        int entry = findEntry(msb, lsb);
+
+        if (entry != -1) {
+            int index = entryIndex[entry];
+            int limit = entryIndex[entry + 1];
+            for (int i = index; i < limit; i++) {
+                int o = (beforeOffsets[i] & 0xffff) << RECORD_ALIGN_BITS;
+                int n = (afterOffsets[i] & 0xffff) << RECORD_ALIGN_BITS;
+                recIds.put(
+                        new RecordId(segmentId, o),
+                        new RecordId(afterSegmentIds[i],n)
+                );
+            }
+        }
+
+        return recIds;
+    }
+
     void compress() {
         Set<UUID> uuids = newTreeSet();
 
diff --git oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Record.java oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Record.java
index ec8fbfd..c46a39a 100644
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Record.java
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Record.java
@@ -16,8 +16,12 @@
  */
 package org.apache.jackrabbit.oak.plugins.segment;
 
+import java.util.List;
+
 import javax.annotation.Nonnull;
 
+import static com.google.common.collect.Lists.newArrayListWithCapacity;
+
 /**
  * Record within a segment.
  */
@@ -32,18 +36,34 @@ class Record {
     }
 
     static boolean fastEquals(Record a, Record b) {
-        return a.segmentId == b.segmentId && a.offset == b.offset;
+        boolean result = a.segmentId == b.segmentId && a.offset == b.offset;
+        if (!result && (!a.oldIds.isEmpty() || !b.oldIds.isEmpty())) {
+            for (RecordId ra : a.oldIds) {
+                for (RecordId rb : b.oldIds) {
+                    if (ra.getSegmentId() == rb.getSegmentId() && ra.getOffset() == rb.getOffset()) {
+                        return true;
+                    }
+                }
+            }
+        }
+        return result;
     }
 
     /**
      * Identifier of the segment that contains this record.
      */
-    private final SegmentId segmentId;
+    private volatile SegmentId segmentId;
 
     /**
      * Segment offset of this record.
      */
-    private final int offset;
+    private volatile int offset;
+
+    /**
+     * In most cases the oldIds would be empty so to keep memory
+     * foot print low we keep reference to empty list
+     */
+    private final List<RecordId> oldIds = newArrayListWithCapacity(0);
 
     /**
      * Creates a new object for the identified record.
@@ -79,7 +99,27 @@ class Record {
      * @return segment that contains this record
      */
     protected Segment getSegment() {
-        return segmentId.getSegment();
+        while (true) {
+            try {
+                return segmentId.getSegment();
+            } catch (SegmentNotFoundException e) {
+                RecordId newId = getTracker().getCompactionMap().get(getRecordId());
+                if (newId != null) {
+                    resetRecordId(newId);
+                    segmentRelinked();
+                } else {
+                    throw e;
+                }
+            }
+        }
+    }
+
+    /**
+     * Callback invoked when a segment associated with current record
+     * is relinked
+     */
+    protected void segmentRelinked() {
+
     }
 
     /**
@@ -123,6 +163,10 @@ class Record {
         return getOffset(bytes + ids * Segment.RECORD_ID_BYTES);
     }
 
+    protected boolean isStale(){
+        return segmentId.isStale();
+    }
+
     //------------------------------------------------------------< Object >--
 
     @Override
@@ -140,4 +184,9 @@ class Record {
         return getRecordId().toString();
     }
 
+    private void resetRecordId(RecordId newId){
+        oldIds.add(getRecordId());
+        segmentId = newId.getSegmentId();
+        offset = newId.getOffset();
+    }
 }
diff --git oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentId.java oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentId.java
index 7ed19a0..d55c452 100644
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentId.java
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentId.java
@@ -46,6 +46,8 @@ public class SegmentId implements Comparable<SegmentId> {
 
     private final long lsb;
 
+    private final long creationTime;
+
     /**
      * A reference to the segment object, if it is available in memory. It is
      * used for fast lookup. The segment tracker will set or reset this field.
@@ -53,15 +55,18 @@ public class SegmentId implements Comparable<SegmentId> {
     // TODO: possibly we could remove the volatile
     private volatile Segment segment;
 
-    public SegmentId(SegmentTracker tracker, long msb, long lsb, Segment segment) {
+    private volatile boolean stale;
+
+    public SegmentId(SegmentTracker tracker, long msb, long lsb, Segment segment, long birthTime) {
         this.tracker = tracker;
         this.msb = msb;
         this.lsb = lsb;
         this.segment = segment;
+        this.creationTime = birthTime;
     }
 
     public SegmentId(SegmentTracker tracker, long msb, long lsb) {
-        this(tracker, msb, lsb, null);
+        this(tracker, msb, lsb, null, System.currentTimeMillis());
     }
 
     /**
@@ -102,6 +107,7 @@ public class SegmentId implements Comparable<SegmentId> {
                 if (segment == null) {
                     log.debug("Loading segment {}", this);
                     segment = tracker.getSegment(this);
+                    stale = false;
                 }
             }
         }
@@ -117,6 +123,19 @@ public class SegmentId implements Comparable<SegmentId> {
         return tracker;
     }
 
+    public long getCreationTime() {
+        return creationTime;
+    }
+
+    public synchronized void markStale(){
+        this.segment = null;
+        this.stale = true;
+    }
+
+    public boolean isStale(){
+        return stale;
+    }
+
     //--------------------------------------------------------< Comparable >--
 
     @Override
diff --git oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentIdTable.java oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentIdTable.java
index 6c14a5e..69ec725 100644
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentIdTable.java
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentIdTable.java
@@ -76,7 +76,7 @@ public class SegmentIdTable {
             reference = references.get(index);
         }
 
-        SegmentId id = new SegmentId(tracker, msb, lsb);
+        SegmentId id = new SegmentId(tracker, msb, lsb, null, tracker.getClock().getTime());
         references.set(index, new WeakReference<SegmentId>(id));
         if (index != first) {
             refresh();
diff --git oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeState.java oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeState.java
index 7fedb1b..79d5a9e 100644
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeState.java
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeState.java
@@ -22,14 +22,20 @@ import java.util.List;
 import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
 
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
 import org.apache.jackrabbit.oak.api.PropertyState;
 import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.commons.PathUtils;
 import org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState;
 import org.apache.jackrabbit.oak.plugins.memory.MemoryChildNodeEntry;
+import org.apache.jackrabbit.oak.spi.state.AbstractChildNodeEntry;
 import org.apache.jackrabbit.oak.spi.state.AbstractNodeState;
 import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.apache.jackrabbit.oak.spi.state.NodeStateDiff;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -53,6 +59,7 @@ import static org.apache.jackrabbit.oak.spi.state.AbstractNodeState.checkValidNa
  * currently doesn't cache data (but the template is fully loaded).
  */
 public class SegmentNodeState extends Record implements NodeState {
+    private static Logger log = LoggerFactory.getLogger(SegmentNodeState.class);
 
     private volatile RecordId templateId = null;
 
@@ -63,6 +70,10 @@ public class SegmentNodeState extends Record implements NodeState {
     }
 
     RecordId getTemplateId() {
+        if (templateId != null && isStale()) {
+            templateId = null;
+        }
+
         if (templateId == null) {
             // no problem if updated concurrently,
             // as each concurrent thread will just get the same value
@@ -72,6 +83,11 @@ public class SegmentNodeState extends Record implements NodeState {
     }
 
     Template getTemplate() {
+        if (template != null && isStale()) {
+            templateId = null;
+            template = null;
+        }
+
         if (template == null) {
             // no problem if updated concurrently,
             // as each concurrent thread will just get the same value
@@ -547,6 +563,12 @@ public class SegmentNodeState extends Record implements NodeState {
         return true;
     }
 
+    @Override
+    protected void segmentRelinked() {
+        this.templateId = null;
+        this.template = null;
+    }
+
     private static boolean compareProperties(
             PropertyState before, PropertyState after, NodeStateDiff diff) {
         if (before == null) {
diff --git oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNotFoundException.java oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNotFoundException.java
new file mode 100644
index 0000000..2ba89a0
--- /dev/null
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNotFoundException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.segment;
+
+public class SegmentNotFoundException extends IllegalStateException {
+
+    public SegmentNotFoundException(String s) {
+        super(s);
+    }
+}
diff --git oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTracker.java oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTracker.java
index 5798188..4cf8a99 100644
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTracker.java
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTracker.java
@@ -23,6 +23,7 @@ import static com.google.common.collect.Sets.newIdentityHashSet;
 
 import java.security.SecureRandom;
 import java.util.LinkedList;
+import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
@@ -30,6 +31,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nonnull;
 
 import org.apache.jackrabbit.oak.plugins.blob.ReferenceCollector;
+import org.apache.jackrabbit.oak.stats.Clock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -91,9 +93,12 @@ public class SegmentTracker {
 
     private final LinkedList<Segment> segments = newLinkedList();
 
+    private final Clock clock;
+
     private long currentSize = 0;
 
-    public SegmentTracker(SegmentStore store, int cacheSizeMB) {
+    public SegmentTracker(SegmentStore store, int cacheSizeMB, Clock clock) {
+        this.clock = clock;
         for (int i = 0; i < tables.length; i++) {
             tables[i] = new SegmentIdTable(this);
         }
@@ -103,8 +108,12 @@ public class SegmentTracker {
         this.cacheSize = cacheSizeMB * MB;
     }
 
+    public SegmentTracker(SegmentStore store, Clock clock) {
+        this(store, DEFAULT_MEMORY_CACHE_SIZE, clock);
+    }
+
     public SegmentTracker(SegmentStore store) {
-        this(store, DEFAULT_MEMORY_CACHE_SIZE);
+        this(store, DEFAULT_MEMORY_CACHE_SIZE, Clock.SIMPLE);
     }
 
     public SegmentWriter getWriter() {
@@ -157,9 +166,40 @@ public class SegmentTracker {
     }
 
     public void setCompactionMap(CompactionMap compaction) {
+        CompactionMap previous = compactionMap.get();
+        Set<SegmentId> referencedIds = getReferencedSegmentIds();
+
+        for (SegmentId id : referencedIds) {
+            if (id.isStale()) {
+                Map<RecordId, RecordId> recIds = previous.getReferencedRecords(id);
+                for (Map.Entry<RecordId, RecordId> e : recIds.entrySet()) {
+                    RecordId value = e.getValue();
+                    RecordId result = null;
+                    //Trace the chain in previous map
+                    while((result = previous.get(value)) != null){
+                        value = result;
+                    }
+
+                    //Check also in latest
+                    result = compaction.get(value);
+                    if( result != null) {
+                        value = result;
+                    }
+
+                    compaction.put(e.getKey(), value);
+                }
+            }
+        }
+
+        compaction.compress();
         compactionMap.set(compaction);
     }
 
+    public synchronized void dropCache() {
+        segments.clear();
+        currentSize = 0;
+    }
+
     @Nonnull
     CompactionMap getCompactionMap() {
         return compactionMap.get();
@@ -220,6 +260,10 @@ public class SegmentTracker {
         return tables[index].getSegmentId(msb, lsb);
     }
 
+    public Clock getClock() {
+        return clock;
+    }
+
     SegmentId newDataSegmentId() {
         return newSegmentId(DATA);
     }
diff --git oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
index 7ec4a62..80756cd 100644
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
@@ -36,6 +36,7 @@ import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileLock;
 import java.util.Arrays;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -43,6 +44,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Matcher;
@@ -56,6 +58,7 @@ import org.apache.jackrabbit.oak.plugins.segment.RecordId;
 import org.apache.jackrabbit.oak.plugins.segment.Segment;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentId;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentNotFoundException;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentTracker;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
@@ -63,6 +66,7 @@ import org.apache.jackrabbit.oak.plugins.segment.SegmentWriter;
 import org.apache.jackrabbit.oak.spi.blob.BlobStore;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.stats.Clock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -150,7 +154,7 @@ public class FileStore implements SegmentStore {
 
     public FileStore(BlobStore blobStore, File directory, int maxFileSizeMB, boolean memoryMapping)
             throws IOException {
-        this(blobStore, directory, EMPTY_NODE, maxFileSizeMB, 0, memoryMapping);
+        this(blobStore, directory, EMPTY_NODE, maxFileSizeMB, 0, memoryMapping, Clock.SIMPLE);
     }
 
     public FileStore(File directory, int maxFileSizeMB, boolean memoryMapping)
@@ -165,18 +169,18 @@ public class FileStore implements SegmentStore {
 
     public FileStore(File directory, int maxFileSizeMB, int cacheSizeMB,
             boolean memoryMapping) throws IOException {
-        this(null, directory, EMPTY_NODE, maxFileSizeMB, cacheSizeMB, memoryMapping);
+        this(null, directory, EMPTY_NODE, maxFileSizeMB, cacheSizeMB, memoryMapping, Clock.SIMPLE);
     }
 
     public FileStore(
             BlobStore blobStore, final File directory, NodeState initial,
-            int maxFileSizeMB, int cacheSizeMB, boolean memoryMapping)
+            int maxFileSizeMB, int cacheSizeMB, boolean memoryMapping, Clock clock)
             throws IOException {
         checkNotNull(directory).mkdirs();
         if (cacheSizeMB > 0) {
-            this.tracker = new SegmentTracker(this, cacheSizeMB);
+            this.tracker = new SegmentTracker(this, cacheSizeMB, clock);
         } else {
-            this.tracker = new SegmentTracker(this);
+            this.tracker = new SegmentTracker(this, clock);
         }
         this.blobStore = blobStore;
         this.directory = directory;
@@ -435,6 +439,12 @@ public class FileStore implements SegmentStore {
         }
     }
 
+    public synchronized void cleanup() throws IOException {
+//        this.cleanup(-1);
+        getTracker().dropCache();
+        this.cleanup(System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(2));
+    }
+
     /**
      * Runs garbage collection on the segment level, which could write new
      * generations of tar files. It checks which segments are still reachable,
@@ -443,7 +453,7 @@ public class FileStore implements SegmentStore {
      * A new generation of a tar file is created (and segments are only
      * discarded) if doing so releases more than 25% of the space in a tar file.
      */
-    public synchronized void cleanup() throws IOException {
+    public synchronized void cleanup(long lastCreationTime) throws IOException {
         Stopwatch watch = Stopwatch.createStarted();
         long initialSize = size();
         log.info("TarMK revision cleanup started. Current repository size {}",
@@ -453,12 +463,28 @@ public class FileStore implements SegmentStore {
         // to clear stale weak references in the SegmentTracker
         System.gc();
 
+        if(lastCreationTime > 0){
+            log.info("SegmentIds older than {} would not be considered while determining root set",
+                    new Date(lastCreationTime));
+        }
+
+        final SegmentId headId = getHead().getRecordId().getSegmentId();
+        int staleCount = 0;
         Set<UUID> ids = newHashSet();
-        for (SegmentId id : tracker.getReferencedSegmentIds()) {
+        Set<SegmentId> referencedIds = tracker.getReferencedSegmentIds();
+        for (SegmentId id : referencedIds) {
+            if(lastCreationTime > 0
+                    && id.getCreationTime() < lastCreationTime
+                    && id != headId){
+                staleCount++;
+                id.markStale();
+                continue;
+            }
             ids.add(new UUID(
                     id.getMostSignificantBits(),
                     id.getLeastSignificantBits()));
         }
+        log.info("Marked {} segmentId of {} as stale ",staleCount, referencedIds.size());
         writer.cleanup(ids);
 
         List<TarReader> list =
@@ -669,7 +695,7 @@ public class FileStore implements SegmentStore {
             }
         }
 
-        throw new IllegalStateException("Segment " + id + " not found");
+        throw new SegmentNotFoundException("Segment " + id + " not found");
     }
 
     @Override
diff --git oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/memory/MemoryStore.java oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/memory/MemoryStore.java
index df0a77b..446e7d9 100644
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/memory/MemoryStore.java
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/memory/MemoryStore.java
@@ -26,6 +26,7 @@ import javax.annotation.Nonnull;
 import org.apache.jackrabbit.oak.api.Blob;
 import org.apache.jackrabbit.oak.plugins.segment.Segment;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentId;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentNotFoundException;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentTracker;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
@@ -92,7 +93,7 @@ public class MemoryStore implements SegmentStore {
         if (segment != null) {
             return segment;
         } else {
-            throw new IllegalArgumentException("Segment not found: " + id);
+            throw new SegmentNotFoundException("Segment not found: " + id);
         }
     }
 
diff --git oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/CompactionAndCleanupTest.java oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/CompactionAndCleanupTest.java
index 90d4f11..80e262a 100644
--- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/CompactionAndCleanupTest.java
+++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/CompactionAndCleanupTest.java
@@ -33,17 +33,20 @@ import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.apache.jackrabbit.oak.stats.Clock;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
 
+import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 public class CompactionAndCleanupTest {
 
     private File directory;
+    public static final int MB = 1024 * 1024;
 
     @Before
     public void setUp() throws IOException {
@@ -54,7 +57,6 @@ public class CompactionAndCleanupTest {
     }
 
     @Test
-    @Ignore("OAK-2045")
     public void compactionAndWeakReferenceMagic() throws Exception{
         final int MB = 1024 * 1024;
         final int blobSize = 5 * MB;
@@ -118,6 +120,74 @@ public class CompactionAndCleanupTest {
         assertEquals(mb(fileStore.size()), mb(blobSize));
     }
 
+    @Test
+    public void compactionAndAgeing() throws Exception{
+        final int blobSize = 5 * MB;
+
+        Clock clock = new Clock.Virtual();
+        FileStore fileStore = new FileStore(null,directory, EMPTY_NODE, 1, 1, false, clock);
+        SegmentNodeStore nodeStore = new SegmentNodeStore(fileStore);
+
+        //1. Create a property with 5 MB blob
+        NodeBuilder builder = nodeStore.getRoot().builder();
+        builder.setProperty("a1", createBlob(nodeStore, blobSize));
+        builder.setProperty("b", "foo");
+        builder.child("bar").setProperty("name", "batman");
+
+        //Keep a reference to this nodeState to simulate long
+        //running session
+        NodeState ns1 = nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+        NodeState ns2 = ns1.getChildNode("bar");
+        long removalTime = clock.getTime() + 1;
+        clock.waitUntil(removalTime);
+
+        System.out.printf("File store pre removal %d%n", mb(fileStore.size()));
+        assertEquals(mb(fileStore.size()), mb(blobSize));
+        assertEquals("foo", ns1.getString("b"));
+
+
+        //2. Now remove the property and update the name
+        builder = nodeStore.getRoot().builder();
+        builder.removeProperty("a1");
+        builder.child("bar").setProperty("name", "joker");
+        builder.child("city").setProperty("name", "gotham");
+        builder.child("foo").child("bar").setProperty("name", "batwoman");
+        nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+        //Size remains same
+        System.out.printf("File store pre compaction %d%n", mb(fileStore.size()));
+        assertEquals(mb(fileStore.size()), mb(blobSize));
+
+        //3. Compact
+        fileStore.compact();
+
+        //Size still remains same
+        System.out.printf("File store post compaction %d%n", mb(fileStore.size()));
+        assertEquals(mb(fileStore.size()), mb(blobSize));
+
+        //4. Add some more property to flush the current TarWriter
+        builder = nodeStore.getRoot().builder();
+        builder.setProperty("a2", createBlob(nodeStore, blobSize));
+        nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+        //Size is double
+        System.out.printf("File store pre cleanup %d%n", mb(fileStore.size()));
+        assertEquals(mb(fileStore.size()), 2 * mb(blobSize));
+
+        //5. Cleanup. Ignore references with time earlier than removal time
+        fileStore.cleanup(removalTime);
+
+        //Size should now come back to 5 and deleted data
+        //space reclaimed
+        System.out.printf("File store post cleanup and nullification %d%n", mb(fileStore.size()));
+        assertEquals(mb(fileStore.size()), mb(blobSize));
+
+        //Old state should be linked to current head now
+        assertEquals("joker", ns1.getChildNode("bar").getString("name"));
+        assertEquals("gotham", ns1.getChildNode("city").getString("name"));
+        assertEquals("joker", ns2.getString("name"));
+    }
+
     @After
     public void cleanDir() throws IOException {
         FileUtils.deleteDirectory(directory);
@@ -126,7 +196,7 @@ public class CompactionAndCleanupTest {
     private static void cleanup(FileStore fileStore) throws IOException {
         fileStore.getTracker().setCompactionMap(new Compactor(null).getCompactionMap());
         fileStore.getTracker().getWriter().dropCache();
-
+        fileStore.getTracker().dropCache();
         fileStore.cleanup();
     }
 
@@ -142,7 +212,6 @@ public class CompactionAndCleanupTest {
 
     @Test
     public void testGainEstimator() throws Exception {
-        final int MB = 1024 * 1024;
         final int blobSize = 2 * MB;
 
         FileStore fileStore = new FileStore(directory, 2, false);
diff --git oak-run/src/main/java/org/apache/jackrabbit/oak/fixture/OakFixture.java oak-run/src/main/java/org/apache/jackrabbit/oak/fixture/OakFixture.java
index 377fbc9..1759f71 100644
--- oak-run/src/main/java/org/apache/jackrabbit/oak/fixture/OakFixture.java
+++ oak-run/src/main/java/org/apache/jackrabbit/oak/fixture/OakFixture.java
@@ -36,6 +36,7 @@ import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
 import org.apache.jackrabbit.oak.plugins.segment.file.FileStore;
 import org.apache.jackrabbit.oak.spi.blob.BlobStore;
+import org.apache.jackrabbit.oak.stats.Clock;
 
 public abstract class OakFixture {
 
@@ -385,7 +386,7 @@ public abstract class OakFixture {
                 stores[i] = new FileStore(blobStore,
                         new File(base, unique),
                         EmptyNodeState.EMPTY_NODE,
-                        maxFileSizeMB, cacheSizeMB, memoryMapping);
+                        maxFileSizeMB, cacheSizeMB, memoryMapping, Clock.SIMPLE);
                 cluster[i] = new Oak(new SegmentNodeStore(stores[i]));
             }
             return cluster;
