Index: src/main/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMap.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMap.java (revision 1636540) +++ src/main/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMap.java (working copy) @@ -77,6 +77,8 @@ private SegmentId[] afterSegmentIds = new SegmentId[0]; private short[] afterOffsets = new short[0]; + private CompactionMap prev; + CompactionMap(int compressInterval) { this.compressInterval = compressInterval; } @@ -90,23 +92,49 @@ * @return whether {@code before} was compacted to {@code after} */ boolean wasCompactedTo(RecordId before, RecordId after) { - return after.equals(get(before)); + if (isDisabled()) { + return false; + } + return recursiveWasCompactedTo(before, after); } /** - * Checks whether content in the segment with the given identifier was - * compacted to new segments. - * - * @param id segment identifier - * @return whether the identified segment was compacted + * Given a record and a map I need to cycle down the #prev line to identify the compacted version. + * + * @param before before record identifier + * @param after after record identifier + * @return whether {@code before} was compacted to {@code after} */ - boolean wasCompacted(SegmentId id) { - long msb = id.getMostSignificantBits(); - long lsb = id.getLeastSignificantBits(); - return findEntry(msb, lsb) != -1; + private boolean recursiveWasCompactedTo(RecordId before, + RecordId after) { + RecordId potentialAfter = recursiveGet(this, before); + // #1 it exists: check it or start over with the new recordId + if (potentialAfter == null) { + // there's no more options + return false; + } + if (after.equals(potentialAfter)) { + return true; + } + return recursiveWasCompactedTo(potentialAfter, after); } - public RecordId get(RecordId before) { + private RecordId recursiveGet(CompactionMap map, RecordId before) { + RecordId after = map.get(before); + if (after != null) { + return after; + } + if (map.prev != null) { + return recursiveGet(map.prev, before); + } + return null; + } + + protected RecordId get(RecordId before) { + if(isDisabled()){ + return null; + } + RecordId after = recent.get(before); if (after != null) { return after; @@ -142,6 +170,9 @@ * added entry is not supported. */ void put(RecordId before, RecordId after) { + if (isDisabled()) { + return; + } assert get(before) == null; recent.put(before, after); if (recent.size() >= compressInterval) { @@ -150,6 +181,10 @@ } void compress() { + if (recent.isEmpty()) { + // noop + return; + } Set uuids = newTreeSet(); Map> mapping = newTreeMap(); @@ -294,4 +329,17 @@ return -1; } + private boolean isDisabled() { + return compressInterval == -1; + } + + public CompactionMap getPrev() { + return prev; + } + + public void setPrev(CompactionMap prev) { + this.prev = prev; + this.prev.compress(); + } + } Index: src/main/java/org/apache/jackrabbit/oak/plugins/segment/CompactionStrategy.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/plugins/segment/CompactionStrategy.java (revision 0) +++ src/main/java/org/apache/jackrabbit/oak/plugins/segment/CompactionStrategy.java (revision 0) @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.segment; + +public interface CompactionStrategy { + + boolean remove(SegmentId sid); + + boolean cloneBinaries(); + + boolean paused(); + +} Index: src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java (revision 1636540) +++ src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java (working copy) @@ -66,7 +66,7 @@ private final SegmentWriter writer; - private CompactionMap map = new CompactionMap(100000); + private final CompactionMap map = new CompactionMap(100000); /** * Map from {@link #getBlobKey(Blob) blob keys} to matching compacted @@ -75,8 +75,19 @@ */ private final Map> binaries = newHashMap(); + /** + * If the compactor should copy large binaries as streams or just copy the + * refs + */ + private final boolean cloneLargeBinaries; + public Compactor(SegmentWriter writer) { + this(writer, false); + } + + public Compactor(SegmentWriter writer, boolean cloneLargeBinaries) { this.writer = writer; + this.cloneLargeBinaries = cloneLargeBinaries; } protected SegmentNodeBuilder process(NodeState before, NodeState after) { @@ -216,7 +227,7 @@ try { // if the blob is inlined or external, just clone it if (sb.isExternal() || sb.length() < Segment.MEDIUM_LIMIT) { - return sb.clone(writer); + return sb.clone(writer, cloneLargeBinaries); } // else check if we've already cloned this specific record @@ -239,7 +250,7 @@ } // if not, clone the blob and keep track of the result - sb = sb.clone(writer); + sb = sb.clone(writer, cloneLargeBinaries); map.put(id, sb.getRecordId()); if (ids == null) { ids = newArrayList(); Index: src/main/java/org/apache/jackrabbit/oak/plugins/segment/GCHeadGuard.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/plugins/segment/GCHeadGuard.java (revision 0) +++ src/main/java/org/apache/jackrabbit/oak/plugins/segment/GCHeadGuard.java (revision 0) @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.segment; + +import java.util.concurrent.Semaphore; + +public class GCHeadGuard { + + private final Semaphore semaphore; + + public GCHeadGuard(Semaphore semaphore) { + this.semaphore = semaphore; + } + + public boolean tryAcquire() { + return semaphore == null || semaphore.tryAcquire(); + } + + public void release() { + if (semaphore != null) { + semaphore.release(); + } + } + + @Override + public String toString() { + return "GCHeadGuard [semaphore=" + semaphore + "]"; + } + +} Index: src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentBlob.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentBlob.java (revision 1636540) +++ src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentBlob.java (working copy) @@ -162,7 +162,7 @@ } } - public SegmentBlob clone(SegmentWriter writer) throws IOException { + public SegmentBlob clone(SegmentWriter writer, boolean cloneLargeBinaries) throws IOException { Segment segment = getSegment(); int offset = getOffset(); byte head = segment.readByte(offset); @@ -174,11 +174,18 @@ return writer.writeStream(new BufferedInputStream(getNewStream())); } else if ((head & 0xe0) == 0xc0) { // 110x xxxx: long value - long length = (segment.readLong(offset) & 0x1fffffffffffffffL) + MEDIUM_LIMIT; - int listSize = (int) ((length + BLOCK_SIZE - 1) / BLOCK_SIZE); - ListRecord list = new ListRecord( - segment.readRecordId(offset + 8), listSize); - return writer.writeLargeBlob(length, list.getEntries()); + if (cloneLargeBinaries) { + return writer.writeStream(new BufferedInputStream( + getNewStream())); + } else { + // this was the previous (default) behavior + long length = (segment.readLong(offset) & 0x1fffffffffffffffL) + + MEDIUM_LIMIT; + int listSize = (int) ((length + BLOCK_SIZE - 1) / BLOCK_SIZE); + ListRecord list = new ListRecord( + segment.readRecordId(offset + 8), listSize); + return writer.writeLargeBlob(length, list.getEntries()); + } } else if ((head & 0xf0) == 0xe0) { // 1110 xxxx: external value return writer.writeExternalBlob(getBlobId()); Index: src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentId.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentId.java (revision 1636540) +++ src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentId.java (working copy) @@ -46,6 +46,8 @@ private final long lsb; + private final long creationTime; + /** * A reference to the segment object, if it is available in memory. It is * used for fast lookup. The segment tracker will set or reset this field. @@ -53,15 +55,16 @@ // TODO: possibly we could remove the volatile private volatile Segment segment; - public SegmentId(SegmentTracker tracker, long msb, long lsb, Segment segment) { + private SegmentId(SegmentTracker tracker, long msb, long lsb, Segment segment, long creationTime) { this.tracker = tracker; this.msb = msb; this.lsb = lsb; this.segment = segment; + this.creationTime = creationTime; } public SegmentId(SegmentTracker tracker, long msb, long lsb) { - this(tracker, msb, lsb, null); + this(tracker, msb, lsb, null, System.currentTimeMillis()); } /** @@ -117,6 +120,10 @@ return tracker; } + public long getCreationTime() { + return creationTime; + } + //--------------------------------------------------------< Comparable >-- @Override Index: src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentIdTable.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentIdTable.java (revision 1636540) +++ src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentIdTable.java (working copy) @@ -143,4 +143,21 @@ return ((int) lsb) & (references.size() - 1); } + public synchronized void clearSegmentIdTables(CompactionStrategy strategy) { + int size = references.size(); + boolean dirty = false; + for (int i = 0; i < size; i++) { + WeakReference reference = references.get(i); + if (reference != null) { + SegmentId id = reference.get(); + if (id != null && strategy.remove(id)) { + references.set(i, null); + dirty = true; + } + } + } + if (dirty) { + refresh(); + } + } } Index: src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java (revision 1636540) +++ src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java (working copy) @@ -39,6 +39,7 @@ import org.apache.jackrabbit.oak.api.CommitFailedException; import org.apache.jackrabbit.oak.api.PropertyState; import org.apache.jackrabbit.oak.api.Type; +import org.apache.jackrabbit.oak.plugins.segment.file.FileStore; import org.apache.jackrabbit.oak.plugins.segment.memory.MemoryStore; import org.apache.jackrabbit.oak.spi.blob.BlobStore; import org.apache.jackrabbit.oak.spi.commit.ChangeDispatcher; @@ -84,12 +85,19 @@ */ private final Semaphore commitSemaphore = new Semaphore(1); + private final GCHeadGuard gcGuard = new GCHeadGuard(commitSemaphore); + private long maximumBackoff = MILLISECONDS.convert(10, SECONDS); public SegmentNodeStore(SegmentStore store) { this.store = store; this.head = new AtomicReference(store.getHead()); this.changeDispatcher = new ChangeDispatcher(getRoot()); + + // TODO find a nicer way? + if (this.store instanceof FileStore) { + ((FileStore) this.store).setGcGuard(gcGuard); + } } public SegmentNodeStore() { Index: src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java (revision 1636540) +++ src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java (working copy) @@ -18,6 +18,7 @@ import static com.google.common.base.Preconditions.checkState; import static org.apache.jackrabbit.oak.commons.PropertiesUtil.toBoolean; +import static org.apache.jackrabbit.oak.commons.PropertiesUtil.toLong; import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.registerMBean; import java.io.Closeable; @@ -32,6 +33,7 @@ import org.apache.felix.scr.annotations.ConfigurationPolicy; import org.apache.felix.scr.annotations.Deactivate; import org.apache.felix.scr.annotations.Property; +import org.apache.felix.scr.annotations.PropertyOption; import org.apache.felix.scr.annotations.Reference; import org.apache.felix.scr.annotations.ReferenceCardinality; import org.apache.felix.scr.annotations.ReferencePolicy; @@ -41,6 +43,7 @@ import org.apache.jackrabbit.oak.plugins.blob.BlobGCMBean; import org.apache.jackrabbit.oak.plugins.blob.BlobGarbageCollector; import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector; +import org.apache.jackrabbit.oak.plugins.segment.file.DefaultCompactionStrategy; import org.apache.jackrabbit.oak.plugins.segment.file.FileStore; import org.apache.jackrabbit.oak.spi.blob.BlobStore; import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore; @@ -80,11 +83,23 @@ @Property(description="Cache size (MB)", intValue=256) public static final String CACHE = "cache"; + @Property(description = "TarMK compaction clone binaries flag", boolValue = false) + public static final String COMPACTION_CLONE_BINARIES = "compaction.cloneBinaries"; + + @Property(options = { @PropertyOption(name = "all", value = "all"), + @PropertyOption(name = "none", value = "none"), + @PropertyOption(name = "timestamp", value = "timestamp") }, value = "all") + public static final String COMPACTION_CLEANUP = "compaction.cleanup"; + + @Property(description = "TarMK compaction strategy timestamp older (ms)", longValue = 1000 * 60 * 60) + public static final String COMPACTION_CLEANUP_TIMESTAMP = "compaction.cleanup.timestamp"; + @Property(description = "TarMK compaction paused flag", boolValue = true) public static final String PAUSE_COMPACTION = "pauseCompaction"; @Property(description = "Flag indicating that this component will not register as a NodeStore but just as a NodeStoreProvider", boolValue = false) public static final String STANDBY = "standby"; + /** * Boolean value indicating a blobStore is to be used */ @@ -159,16 +174,28 @@ size = System.getProperty(SIZE, "256"); } - boolean pauseCompaction = toBoolean(lookup(context, PAUSE_COMPACTION), true); + boolean pauseCompaction = toBoolean(lookup(context, PAUSE_COMPACTION), + true); + boolean cloneBinaries = toBoolean( + lookup(context, COMPACTION_CLONE_BINARIES), false); + long cleanupTs = toLong(lookup(context, COMPACTION_CLEANUP_TIMESTAMP), + -1); + String cleanup = lookup(context, COMPACTION_CLEANUP); + if (cleanup == null) { + cleanup = "none"; + } + CompactionStrategy compactionStrategy = new DefaultCompactionStrategy( + pauseCompaction, cloneBinaries, cleanup, cleanupTs); + boolean memoryMapping = "64".equals(mode); + int cacheSize = Integer.parseInt(size); if (customBlobStore) { log.info("Initializing SegmentNodeStore with BlobStore [{}]", blobStore); - store = new FileStore(blobStore, new File(directory), - Integer.parseInt(size), memoryMapping) - .setPauseCompaction(pauseCompaction); + store = new FileStore(blobStore, new File(directory), cacheSize, + memoryMapping).setCompactionStrategy(compactionStrategy); } else { - store = new FileStore(new File(directory), Integer.parseInt(size), - memoryMapping).setPauseCompaction(pauseCompaction); + store = new FileStore(new File(directory), cacheSize, memoryMapping) + .setCompactionStrategy(compactionStrategy); } delegate = new SegmentNodeStore(store); Index: src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTracker.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTracker.java (revision 1636540) +++ src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTracker.java (working copy) @@ -74,7 +74,7 @@ * after compaction. */ private final AtomicReference compactionMap = - new AtomicReference(new CompactionMap(1)); + new AtomicReference(new CompactionMap(-1)); private final long cacheSize; @@ -116,9 +116,16 @@ } Segment getSegment(SegmentId id) { - Segment segment = store.readSegment(id); - setSegment(id, segment); - return segment; + try { + Segment segment = store.readSegment(id); + setSegment(id, segment); + return segment; + } catch (SegmentNotFoundException snfe) { + long delta = System.currentTimeMillis() - id.getCreationTime(); + log.error("SegmentNotFoundException creation date delta is {}", + delta); + throw snfe; + } } void setSegment(SegmentId id, Segment segment) { @@ -157,6 +164,7 @@ } public void setCompactionMap(CompactionMap compaction) { + compaction.setPrev(compactionMap.get()); compactionMap.set(compaction); } @@ -234,4 +242,9 @@ return getSegmentId(msb, lsb); } + public synchronized void clearSegmentIdTables(CompactionStrategy strategy) { + for (int i = 0; i < tables.length; i++) { + tables[i].clearSegmentIdTables(strategy); + } + } } Index: src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/DefaultCompactionStrategy.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/DefaultCompactionStrategy.java (revision 0) +++ src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/DefaultCompactionStrategy.java (revision 0) @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.segment.file; + +import org.apache.jackrabbit.oak.plugins.segment.CompactionStrategy; +import org.apache.jackrabbit.oak.plugins.segment.SegmentId; + +public class DefaultCompactionStrategy implements CompactionStrategy { + + public static final CompactionStrategy DEFAULT = new DefaultCompactionStrategy( + true, false, "none", -1); + + static enum CLEANUP { + all, none, timestamp + } + + private boolean pauseCompaction; + + private boolean cloneBinaries; + + private CLEANUP cleanup; + + /** + * anything that has a lifetime bigger than this will be removed. a value of + * 0 (of very small) acts like a CLEANUP.NONE, a value of -1 (or negative) + * acts like a CLEANUP.ALL + * + */ + private long olderThan; + + public DefaultCompactionStrategy(boolean pauseCompaction, + boolean cloneBinaries, String cleanup, long olderThan) { + this.cloneBinaries = cloneBinaries; + this.olderThan = olderThan; + setCleanup(cleanup); + } + + @Override + public boolean remove(SegmentId sid) { + switch (cleanup) { + case all: + return true; + case none: + return false; + case timestamp: + if (olderThan >= 0) { + return sid.getCreationTime() <= olderThan; + } + return true; + } + return false; + } + + @Override + public boolean cloneBinaries() { + return this.cloneBinaries; + } + + @Override + public boolean paused() { + return pauseCompaction; + } + + void setPauseCompaction(boolean pauseCompaction) { + this.pauseCompaction = pauseCompaction; + } + + void setCloneBinaries(boolean cloneBinaries) { + this.cloneBinaries = cloneBinaries; + } + + void setCleanup(String cleanup) { + if (CLEANUP.none.toString().equals(cleanup)) { + this.cleanup = CLEANUP.none; + } else if (CLEANUP.all.toString().equals(cleanup)) { + this.cleanup = CLEANUP.all; + } else if (CLEANUP.timestamp.toString().equals(cleanup)) { + this.cleanup = CLEANUP.timestamp; + } else { + throw new IllegalArgumentException("Unknown cleanup strategy."); + } + } + + void setOlderThan(long olderThan) { + this.olderThan = olderThan; + } + + @Override + public String toString() { + return "DefaultCompactionStrategy [pauseCompaction=" + pauseCompaction + + ", cloneBinaries=" + cloneBinaries + ", cleanup=" + cleanup + + ", olderThan=" + olderThan + "]"; + } +} Index: src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java (revision 1636540) +++ src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java (working copy) @@ -51,9 +51,12 @@ import com.google.common.base.Stopwatch; import com.google.common.collect.Maps; + import org.apache.jackrabbit.oak.api.Blob; import org.apache.jackrabbit.oak.plugins.blob.BlobStoreBlob; +import org.apache.jackrabbit.oak.plugins.segment.CompactionStrategy; import org.apache.jackrabbit.oak.plugins.segment.Compactor; +import org.apache.jackrabbit.oak.plugins.segment.GCHeadGuard; import org.apache.jackrabbit.oak.plugins.segment.RecordId; import org.apache.jackrabbit.oak.plugins.segment.Segment; import org.apache.jackrabbit.oak.plugins.segment.SegmentId; @@ -134,17 +137,16 @@ */ private final BackgroundThread compactionThread; + private CompactionStrategy compaction = DefaultCompactionStrategy.DEFAULT; + + private volatile GCHeadGuard gcGuard = new GCHeadGuard(null); + /** * Flag to request revision cleanup during the next flush. */ private final AtomicBoolean cleanupNeeded = new AtomicBoolean(false); /** - * Flag to set the compaction on pause. - */ - private volatile boolean pauseCompaction = true; - - /** * List of old tar file generations that are waiting to be removed. They can * not be removed immediately, because they first need to be closed, and the * JVM needs to release the memory mapped file references. @@ -272,10 +274,10 @@ estimate.getTotalSize(), humanReadableByteCount(estimate.getReachableSize()), humanReadableByteCount(estimate.getTotalSize())); - if (!pauseCompaction) { - compact(); + if (!compaction.paused()) { + // compact(); } else { - log.info("TarMK compaction paused"); + // log.info("TarMK compaction paused"); } } else { log.info( @@ -286,6 +288,9 @@ humanReadableByteCount(estimate.getReachableSize()), humanReadableByteCount(estimate.getTotalSize())); } + // ---- + compact(); + // ---- cleanupNeeded.set(true); } }); @@ -494,10 +499,10 @@ */ public void compact() { long start = System.nanoTime(); - log.info("TarMK compaction running"); + log.info("TarMK compaction running, strategy={}", compaction); SegmentWriter writer = new SegmentWriter(this, tracker); - Compactor compactor = new Compactor(writer); + Compactor compactor = new Compactor(writer, compaction.cloneBinaries()); SegmentNodeState before = getHead(); long existing = before.getChildNode(SegmentNodeStore.CHECKPOINTS) @@ -510,27 +515,54 @@ SegmentNodeState after = compactor.compact(EMPTY_NODE, before); writer.flush(); - while (!setHead(before, after)) { + while (!setHead(before, after, compactor)) { // Some other concurrent changes have been made. // Rebase (and compact) those changes on top of the // compacted state before retrying to set the head. SegmentNodeState head = getHead(); - after = compactor.compact(before, head); + after = compactor.compact(after, head); before = head; writer.flush(); } - tracker.setCompactionMap(compactor.getCompactionMap()); - - // Drop the SegmentWriter caches and flush any existing state - // in an attempt to prevent new references to old pre-compacted - // content. TODO: There should be a cleaner way to do this. - tracker.getWriter().dropCache(); - tracker.getWriter().flush(); log.info("TarMK compaction completed in {}ms", MILLISECONDS .convert(System.nanoTime() - start, NANOSECONDS)); } + private boolean setHead(SegmentNodeState before, SegmentNodeState after, + Compactor compactor) { + if (gcGuard.tryAcquire()) { + // Need to acquire SegmentNodeStore.commitSemaphore to prevent doing setHead + // while a commit is in progress. If we don't do this, SegmentNodeStore.Commit.prepare() + // might do a rebase using before and after states from *before* compaction, which + // leads to a rebased state referring to un-compacted states. + try { + boolean success = setHead(before, after); + if (success) { + tracker.setCompactionMap(compactor.getCompactionMap()); + + // Drop the SegmentWriter caches and flush any existing state + // in an attempt to prevent new references to old pre-compacted + // content. TODO: There should be a cleaner way to do this. + // We need to do this inside the commitSemaphore. Doing it before + // might result will result in the new segment still referring to the + // un-compacted one. Doing it afterwards opens a race where a new head, + // which refers to the compacted head, might end up in the old, + // un-compacted segment. Both cases cause the un-compacted state to + // remain references from the compacted one. + tracker.getWriter().dropCache(); + tracker.getWriter().flush(); + tracker.clearSegmentIdTables(compaction); + } + return success; + } finally { + gcGuard.release(); + } + } else { + return false; + } + } + public synchronized Iterable getSegmentIds() { List ids = newArrayList(); for (UUID uuid : writer.getUUIDs()) { @@ -744,8 +776,12 @@ return emptyMap(); } - public FileStore setPauseCompaction(boolean pauseCompaction) { - this.pauseCompaction = pauseCompaction; + public FileStore setCompactionStrategy(CompactionStrategy strategy) { + this.compaction = strategy; return this; } + + public void setGcGuard(GCHeadGuard gcGuard) { + this.gcGuard = gcGuard; + } } Index: src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/CompactionAndCleanupTest.java =================================================================== --- src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/CompactionAndCleanupTest.java (revision 1636540) +++ src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/CompactionAndCleanupTest.java (working copy) @@ -38,7 +38,6 @@ import org.apache.jackrabbit.oak.api.Blob; import org.apache.jackrabbit.oak.api.CommitFailedException; import org.apache.jackrabbit.oak.api.PropertyState; -import org.apache.jackrabbit.oak.plugins.segment.Compactor; import org.apache.jackrabbit.oak.plugins.segment.SegmentId; import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState; import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore; @@ -51,7 +50,6 @@ import org.apache.jackrabbit.oak.spi.state.NodeStore; import org.junit.After; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; public class CompactionAndCleanupTest { @@ -67,13 +65,16 @@ } @Test - @Ignore("OAK-2045") - public void compactionAndWeakReferenceMagic() throws Exception{ + public void compactionAndWeakReferenceMagic() throws Exception { final int MB = 1024 * 1024; final int blobSize = 5 * MB; + DefaultCompactionStrategy custom = new DefaultCompactionStrategy(false, + false, "timestamp", System.currentTimeMillis()); + FileStore fileStore = new FileStore(directory, 1, 1, false); SegmentNodeStore nodeStore = new SegmentNodeStore(fileStore); + fileStore.setCompactionStrategy(custom); //1. Create a property with 5 MB blob NodeBuilder builder = nodeStore.getRoot().builder(); @@ -114,16 +115,24 @@ assertEquals(mb(fileStore.size()), 2 * mb(blobSize)); //5. Cleanup - cleanup(fileStore); + // cleanup(fileStore); + fileStore.compact(); + fileStore.cleanup(); //Size is still double. Deleted space not reclaimed System.out.printf("File store post cleanup %d%n", mb(fileStore.size())); + // TODO assertEquals(mb(fileStore.size()), 2 * mb(blobSize)); //6. Null out any hard reference ns1 = null; builder = null; - cleanup(fileStore); + // refresh the ts ref + custom.setOlderThan(System.currentTimeMillis()); + + fileStore.compact(); + fileStore.cleanup(); + // cleanup(fileStore); //Size should not come back to 5 and deleted data //space reclaimed @@ -136,12 +145,12 @@ FileUtils.deleteDirectory(directory); } - private static void cleanup(FileStore fileStore) throws IOException { - fileStore.getTracker().setCompactionMap(new Compactor(null).getCompactionMap()); - fileStore.getTracker().getWriter().dropCache(); - - fileStore.cleanup(); - } +// private static void cleanup(FileStore fileStore) throws IOException { +// fileStore.getTracker().setCompactionMap(new Compactor(null).getCompactionMap()); +// fileStore.getTracker().getWriter().dropCache(); +// +// fileStore.cleanup(); +// } private static Blob createBlob(NodeStore nodeStore, int size) throws IOException { byte[] data = new byte[size]; @@ -193,7 +202,6 @@ } } - @Ignore("OAK-2192") // FIXME OAK-2192 @Test public void testMixedSegments() throws Exception { FileStore store = new FileStore(directory, 2, false);