Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java (revision 1600576) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java (working copy) @@ -24,9 +24,11 @@ import java.io.IOException; import java.io.InputStream; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import org.apache.jackrabbit.oak.api.Blob; import org.apache.jackrabbit.oak.api.PropertyState; @@ -42,6 +44,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.ImmutableSortedMap; import com.google.common.hash.Hashing; /** @@ -52,7 +55,7 @@ /** Logger instance */ private static final Logger log = LoggerFactory.getLogger(Compactor.class); - public static void compact(SegmentStore store) { + public static ByteBuffer compact(SegmentStore store) { SegmentWriter writer = store.getTracker().getWriter(); Compactor compactor = new Compactor(writer); @@ -74,8 +77,113 @@ before = head; after = builder.getNodeState(); } + return mapToByteBuffer(compactor.getCompacted()); + } + + /** + * Serializes the records map to a ByteBuffer, this allows the records to be + * GC'ed while maintaining a fast lookup structure. + */ + static ByteBuffer mapToByteBuffer(Map in) { + ByteBuffer buffer = ByteBuffer.allocate(in.size() * 48); + Map sort = ImmutableSortedMap.copyOf(in); + for (Entry e : sort.entrySet()) { + long[] k = recordAsKey(e.getKey()); + buffer.putLong(k[0]); + buffer.putLong(k[1]); + buffer.putLong(k[2]); + long[] v = recordAsKey(e.getValue()); + buffer.putLong(v[0]); + buffer.putLong(v[1]); + buffer.putLong(v[2]); + } + return buffer; } + /** + * Locks down the RecordId persistence structure + */ + static long[] recordAsKey(RecordId r) { + return new long[] { r.getSegmentId().getMostSignificantBits(), + r.getSegmentId().getLeastSignificantBits(), r.getOffset() }; + } + + /** + * Looks for the mapping for a given entry, if none is found, it returns the + * original key + */ + static long[] readEntry(ByteBuffer compaction, long[] entry) { + int position = findEntry(compaction, entry[0], entry[1], entry[2]); + if (position != -1) { + long msb = compaction.getLong(position + 24); + long lsb = compaction.getLong(position + 32); + long offset = compaction.getLong(position + 40); + return new long[] { msb, lsb, offset }; + } + return entry; + } + + private static int findEntry(ByteBuffer index, long msb, long lsb, long offset) { + + // this a copy of the TarReader#findEntry with tiny changes around the + // entry sizes + + // The segment identifiers are randomly generated with uniform + // distribution, so we can use interpolation search to find the + // matching entry in the index. The average runtime is O(log log n). + + int entrySize = 8*6; + + int lowIndex = 0; + int highIndex = /* index.remaining() / */ index.capacity() / entrySize -1; + float lowValue = Long.MIN_VALUE; + float highValue = Long.MAX_VALUE; + float targetValue = msb; + + while (lowIndex <= highIndex) { + int guessIndex = lowIndex + Math.round( + (highIndex - lowIndex) + * (targetValue - lowValue) + / (highValue - lowValue)); + int position = /* index.position() + */ guessIndex * entrySize; + long m = index.getLong(position); + if (msb < m) { + highIndex = guessIndex - 1; + highValue = m; + } else if (msb > m) { + lowIndex = guessIndex + 1; + lowValue = m; + } else { + // getting close... + long l = index.getLong(position + 8); + if (lsb < l) { + highIndex = guessIndex - 1; + highValue = m; + } else if (lsb > l) { + lowIndex = guessIndex + 1; + lowValue = m; + } else { + // getting even closer... + long o = index.getLong(position + 16); + if (offset < o) { + highIndex = guessIndex - 1; + highValue = m; + } else if (offset > o) { + lowIndex = guessIndex + 1; + lowValue = m; + } else { + // found it! + return position; + } + } + } + } + + // not found + return -1; + } + + private final SegmentWriter writer; /** @@ -255,4 +363,8 @@ } } + public Map getCompacted() { + return compacted; + } + } Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/MapRecord.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/MapRecord.java (revision 1600576) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/MapRecord.java (working copy) @@ -364,7 +364,7 @@ } boolean compare(MapRecord before, final NodeStateDiff diff) { - if (fastEquals(this, before)) { + if (fastEquals(this, before, getStore())) { return true; } Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Record.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Record.java (revision 1600576) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Record.java (working copy) @@ -16,6 +16,11 @@ */ package org.apache.jackrabbit.oak.plugins.segment; +import static org.apache.jackrabbit.oak.plugins.segment.Compactor.readEntry; +import static org.apache.jackrabbit.oak.plugins.segment.Compactor.recordAsKey; + +import java.nio.ByteBuffer; + import javax.annotation.Nonnull; /** @@ -23,16 +28,22 @@ */ class Record { - static boolean fastEquals(Object a, Object b) { - return a instanceof Record && fastEquals((Record) a, b); + static boolean fastEquals(Object a, Object b, SegmentStore store) { + return a instanceof Record && fastEquals((Record) a, b, store); } - static boolean fastEquals(Record a, Object b) { - return b instanceof Record && fastEquals(a, (Record) b); + static boolean fastEquals(Record a, Object b, SegmentStore store) { + return b instanceof Record && fastEquals(a, (Record) b, store); } - static boolean fastEquals(Record a, Record b) { - return a.segmentId == b.segmentId && a.offset == b.offset; + static boolean fastEquals(Record a, Record b, SegmentStore store) { + ByteBuffer compaction = store.getCompactionMap(); + if (compaction == null) { + return a.segmentId == b.segmentId && a.offset == b.offset; + } + long[] aId = readEntry(compaction, recordAsKey(a.getRecordId())); + long[] bId = readEntry(compaction, recordAsKey(b.getRecordId())); + return aId[0] == bId[0] && aId[1] == bId[1] && aId[2] == bId[2]; } /** @@ -69,6 +80,15 @@ } /** + * Returns the segment store. + * + * @return segment store + */ + public SegmentStore getStore() { + return segmentId.getTracker().getStore(); + } + + /** * Returns the identifier of this record. * * @return record identifier @@ -113,7 +133,7 @@ @Override public boolean equals(Object that) { - return fastEquals(this, that); + return fastEquals(this, that, getStore()); } @Override Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentBlob.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentBlob.java (revision 1600576) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentBlob.java (working copy) @@ -178,7 +178,7 @@ @Override public boolean equals(Object object) { - if (object == this || fastEquals(this, object)) { + if (object == this || fastEquals(this, object, getStore())) { return true; } else { return object instanceof Blob Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeState.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeState.java (revision 1600576) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeState.java (working copy) @@ -381,7 +381,7 @@ @Override public boolean compareAgainstBaseState(NodeState base, NodeStateDiff diff) { - if (this == base || fastEquals(this, base)) { + if (this == base || fastEquals(this, base, getStore())) { return true; // no changes } else if (base == EMPTY_NODE || !base.exists()) { // special case return EmptyNodeState.compareAgainstEmptyState(this, diff); @@ -476,7 +476,7 @@ if (!diff.childNodeAdded(afterChildName, afterNode)) { return false; } - } else if (!fastEquals(afterNode, beforeNode)) { + } else if (!fastEquals(afterNode, beforeNode, getStore())) { if (!diff.childNodeChanged( afterChildName, beforeNode, afterNode)) { return false; @@ -512,7 +512,7 @@ NodeState beforeChild = beforeTemplate.getChildNode(beforeChildName, beforeId); if (beforeChild.exists()) { - if (!fastEquals(afterChild, beforeChild) + if (!fastEquals(afterChild, beforeChild, getStore()) && !diff.childNodeChanged( childName, beforeChild, afterChild)) { return false; @@ -550,7 +550,7 @@ @Override public boolean equals(Object object) { - if (this == object || fastEquals(this, object)) { + if (this == object || fastEquals(this, object, getStore())) { return true; } else if (object instanceof SegmentNodeState) { SegmentNodeState that = (SegmentNodeState) object; Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java (revision 1600576) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java (working copy) @@ -150,7 +150,7 @@ NodeState root = getRoot(); SegmentNodeState before = snb.getBaseState(); - if (!fastEquals(before, root)) { + if (!fastEquals(before, root, store)) { SegmentNodeState after = snb.getNodeState(); snb.reset(root); after.compareAgainstBaseState( @@ -328,7 +328,7 @@ private SegmentNodeBuilder prepare() throws CommitFailedException { SegmentNodeState state = head.get(); SegmentNodeBuilder builder = state.builder(); - if (fastEquals(before, state.getChildNode(ROOT))) { + if (fastEquals(before, state.getChildNode(ROOT), store)) { // use a shortcut when there are no external changes builder.setChildNode( ROOT, hook.processCommit(before, after, info)); @@ -416,7 +416,7 @@ NodeState execute() throws CommitFailedException, InterruptedException { // only do the merge if there are some changes to commit - if (!fastEquals(before, after)) { + if (!fastEquals(before, after, store)) { long timeout = optimisticMerge(); if (timeout >= 0) { pessimisticMerge(timeout); Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStore.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStore.java (revision 1600576) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStore.java (working copy) @@ -16,6 +16,8 @@ */ package org.apache.jackrabbit.oak.plugins.segment; +import java.nio.ByteBuffer; + import javax.annotation.CheckForNull; import javax.annotation.Nonnull; @@ -84,4 +86,7 @@ */ void gc(); + @CheckForNull + ByteBuffer getCompactionMap(); + } Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Template.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Template.java (revision 1600576) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Template.java (working copy) @@ -240,7 +240,7 @@ // TODO: Leverage the HAMT data structure for the comparison MapRecord thisMap = getChildNodeMap(thisId); MapRecord thatMap = getChildNodeMap(thatId); - if (fastEquals(thisMap, thatMap)) { + if (fastEquals(thisMap, thatMap, thisMap.getStore())) { return true; // shortcut } else if (thisMap.size() != thatMap.size()) { return false; // shortcut Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java (revision 1600576) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java (working copy) @@ -137,6 +137,12 @@ private int compactThreshold = 10; /** + * Serialized map that contains the link between the old record ids and new record ids + * of the compacted states. + */ + private final AtomicReference compactionMap; + + /** * List of old tar file generations that are waiting to be removed. */ private final LinkedList toBeRemoved = newLinkedList(); @@ -266,6 +272,8 @@ flushThread.setPriority(Thread.MIN_PRIORITY); flushThread.start(); + compactionMap = new AtomicReference(null); + log.info("TarMK opened: {} (mmap={})", directory, memoryMapping); } @@ -410,10 +418,11 @@ private void compact() { if (compactNeeded.getAndSet(false)) { long start = System.nanoTime(); - Compactor.compact(this); + compactionMap.set(Compactor.compact(this)); log.info("TarMK Compaction: Completed in {}ms", MILLISECONDS .convert(System.nanoTime() - start, NANOSECONDS)); cleanupNeeded.set(true); + System.gc(); } } @@ -615,7 +624,6 @@ @Override public void gc() { - System.gc(); compactNeeded.set(true); } @@ -627,4 +635,9 @@ return index; } + @Override + public ByteBuffer getCompactionMap() { + return compactionMap.get(); + } + } Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/http/HttpStore.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/http/HttpStore.java (revision 1600576) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/http/HttpStore.java (working copy) @@ -39,6 +39,7 @@ import org.apache.jackrabbit.oak.plugins.segment.SegmentStore; import com.google.common.io.ByteStreams; + import org.apache.jackrabbit.oak.spi.blob.BlobStore; public class HttpStore implements SegmentStore { @@ -167,4 +168,9 @@ // TODO: distributed gc } + @Override + public ByteBuffer getCompactionMap() { + return null; + } + } Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/memory/MemoryStore.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/memory/MemoryStore.java (revision 1600576) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/memory/MemoryStore.java (working copy) @@ -125,4 +125,9 @@ segments.keySet().retainAll(tracker.getReferencedSegmentIds()); } + @Override + public ByteBuffer getCompactionMap() { + return null; + } + } Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactorTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactorTest.java (revision 0) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactorTest.java (revision 0) @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.segment; + +import static org.apache.jackrabbit.oak.plugins.segment.Compactor.mapToByteBuffer; +import static org.apache.jackrabbit.oak.plugins.segment.Compactor.readEntry; +import static org.apache.jackrabbit.oak.plugins.segment.Compactor.recordAsKey; +import static org.apache.jackrabbit.oak.plugins.segment.Segment.RECORD_ALIGN_BITS; +import static org.apache.jackrabbit.oak.plugins.segment.Segment.MAX_SEGMENT_SIZE; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertFalse; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; + +import org.apache.jackrabbit.oak.plugins.segment.memory.MemoryStore; +import org.junit.Test; + +public class CompactorTest { + + @Test + public void mapSerializationTest() { + + final int maxExistingEntries = 100000; + final int maxNonExistingEntries = 10000; + final int seed = new Random().nextInt(); + + SegmentTracker factory = new MemoryStore().getTracker(); + Map map = new HashMap(); + + Random r = new Random(seed); + int existing = r.nextInt(maxExistingEntries); + int nonExisting = r.nextInt(maxNonExistingEntries); + + for (int i = 0; i < existing; i++) { + RecordId k = new RecordId(factory.newDataSegmentId(), + asValidOffset(r.nextInt(MAX_SEGMENT_SIZE))); + RecordId v = new RecordId(factory.newDataSegmentId(), + asValidOffset(r.nextInt(MAX_SEGMENT_SIZE))); + map.put(k, v); + } + ByteBuffer compaction = mapToByteBuffer(map); + + // not serialized, expecting the same value back + for (int i = 0; i < nonExisting; i++) { + RecordId k = new RecordId(factory.newDataSegmentId(), + asValidOffset(r.nextInt(MAX_SEGMENT_SIZE))); + assertFalse("Clash on recordids", map.containsKey(k)); + map.put(k, k); + } + System.out.println(map.size()); + + for (Entry e : map.entrySet()) { + RecordId k = e.getKey(); + long[] v = recordAsKey(e.getValue()); + long[] vl = readEntry(compaction, recordAsKey(k)); + assertArrayEquals("Failed with seed " + seed, vl, v); + } + } + + private int asValidOffset(int random) { + while (random > 0) { + if (random % (1 << RECORD_ALIGN_BITS) == 0) { + return random; + } + random--; + } + return random; + } +}