Index: src/main/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMap.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMap.java (revision 1637657) +++ src/main/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMap.java (working copy) @@ -77,6 +77,8 @@ private SegmentId[] afterSegmentIds = new SegmentId[0]; private short[] afterOffsets = new short[0]; + private CompactionMap prev; + CompactionMap(int compressInterval) { this.compressInterval = compressInterval; } @@ -90,7 +92,40 @@ * @return whether {@code before} was compacted to {@code after} */ boolean wasCompactedTo(RecordId before, RecordId after) { - return after.equals(get(before)); + return recursiveWasCompactedTo(before, after); + } + + /** + * Given a record and a map I need to cycle down the #prev line to identify the compacted version. + * + * @param before before record identifier + * @param after after record identifier + * @return whether {@code before} was compacted to {@code after} + */ + private boolean recursiveWasCompactedTo(RecordId before, + RecordId after) { + RecordId potentialAfter = recursiveGet(this, before); + if (potentialAfter == null) { + return false; + } + if (after.equals(potentialAfter)) { + return true; + } + return recursiveWasCompactedTo(potentialAfter, after); + } + + private RecordId recursiveGet(CompactionMap map, RecordId before) { + if (map.isDisabled()) { + return null; + } + RecordId after = map.get(before); + if (after != null) { + return after; + } + if (map.prev != null) { + return recursiveGet(map.prev, before); + } + return null; } /** @@ -100,13 +135,28 @@ * @param id segment identifier * @return whether the identified segment was compacted */ - boolean wasCompacted(SegmentId id) { + public boolean wasCompacted(UUID id) { long msb = id.getMostSignificantBits(); long lsb = id.getLeastSignificantBits(); - return findEntry(msb, lsb) != -1; + return wasCompacted(this, msb, lsb); } - public RecordId get(RecordId before) { + private static boolean wasCompacted(CompactionMap map, long msb, long lsb) { + int find = map.findEntry(msb, lsb); + if (find != -1) { + return true; + } + if (map.prev != null) { + return wasCompacted(map.prev, msb, lsb); + } + return false; + } + + RecordId get(RecordId before) { + if (isDisabled()) { + return null; + } + RecordId after = recent.get(before); if (after != null) { return after; @@ -142,7 +192,13 @@ * added entry is not supported. */ void put(RecordId before, RecordId after) { - assert get(before) == null; + if (isDisabled()) { + return; + } + // assert get(before) == null; + if (get(before) != null) { + throw new IllegalArgumentException(); + } recent.put(before, after); if (recent.size() >= compressInterval) { compress(); @@ -150,6 +206,10 @@ } void compress() { + if (recent.isEmpty()) { + // noop + return; + } Set uuids = newTreeSet(); Map> mapping = newTreeMap(); @@ -294,4 +354,20 @@ return -1; } + private boolean isDisabled() { + return compressInterval == -1; + } + + public void setPrev(CompactionMap prev) { + this.prev = prev; + this.prev.compress(); + } + + public CompactionMap getPrev() { + return prev; + } + + public int size() { + return msbs.length; + } } Index: src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java (revision 1637657) +++ src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java (working copy) @@ -52,11 +52,6 @@ private static final Logger log = LoggerFactory.getLogger(Compactor.class); /** - * over 64K in size, not will be included in the compaction map - */ - private static final long threshold = 65536; - - /** * Locks down the RecordId persistence structure */ static long[] recordAsKey(RecordId r) { @@ -66,7 +61,7 @@ private final SegmentWriter writer; - private CompactionMap map = new CompactionMap(100000); + private final CompactionMap map = new CompactionMap(100000); /** * Map from {@link #getBlobKey(Blob) blob keys} to matching compacted @@ -143,7 +138,7 @@ if (success) { SegmentNodeState state = writer.writeNode(child.getNodeState()); builder.setChildNode(name, state); - if (id != null && includeInMap(state)) { + if (id != null) { map.put(id, state.getRecordId()); } } @@ -151,23 +146,6 @@ return success; } - private boolean includeInMap(SegmentNodeState state) { - if (state.getChildNodeCount(2) > 1) { - return true; - } - long count = 0; - for (PropertyState ps : state.getProperties()) { - for (int i = 0; i < ps.count(); i++) { - long size = ps.size(i); - count += size; - if (size >= threshold || count >= threshold) { - return true; - } - } - } - return false; - } - @Override public boolean childNodeChanged( String name, NodeState before, NodeState after) { @@ -182,13 +160,15 @@ } NodeBuilder child = builder.getChildNode(name); - boolean success = after.compareAgainstBaseState( - before, new CompactDiff(child)); + boolean success = after.compareAgainstBaseState(before, + new CompactDiff(child)); - if (success && id != null && child.getChildNodeCount(2) > 1) { - RecordId compactedId = - writer.writeNode(child.getNodeState()).getRecordId(); - map.put(id, compactedId); + if (success) { + RecordId compactedId = writer.writeNode(child.getNodeState()) + .getRecordId(); + if (id != null) { + map.put(id, compactedId); + } } return success; @@ -225,11 +205,6 @@ SegmentBlob sb = (SegmentBlob) blob; try { - // if the blob is inlined or external, just clone it - if (sb.isExternal() || sb.length() < Segment.MEDIUM_LIMIT) { - return sb.clone(writer, cloneBinaries); - } - // else check if we've already cloned this specific record RecordId id = sb.getRecordId(); RecordId compactedId = map.get(id); @@ -237,6 +212,13 @@ return new SegmentBlob(compactedId); } + // if the blob is inlined or external, just clone it + if (sb.isExternal() || sb.length() < Segment.MEDIUM_LIMIT) { + SegmentBlob clone = sb.clone(writer, cloneBinaries); + map.put(id, clone.getRecordId()); + return clone; + } + // alternatively look if the exact same binary has been cloned String key = getBlobKey(blob); List ids = binaries.get(key); Index: src/main/java/org/apache/jackrabbit/oak/plugins/segment/GCHeadGuard.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/plugins/segment/GCHeadGuard.java (revision 0) +++ src/main/java/org/apache/jackrabbit/oak/plugins/segment/GCHeadGuard.java (revision 0) @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.segment; + +import java.util.concurrent.Semaphore; + +public class GCHeadGuard { + + private final Semaphore semaphore; + + public GCHeadGuard(Semaphore semaphore) { + this.semaphore = semaphore; + } + + public boolean tryAcquire() { + return semaphore == null || semaphore.tryAcquire(); + } + + public void release() { + if (semaphore != null) { + semaphore.release(); + } + } + + @Override + public String toString() { + return "GCHeadGuard [semaphore=" + semaphore + "]"; + } + +} Index: src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentId.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentId.java (revision 1637657) +++ src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentId.java (working copy) @@ -46,6 +46,8 @@ private final long lsb; + private final long creationTime; + /** * A reference to the segment object, if it is available in memory. It is * used for fast lookup. The segment tracker will set or reset this field. @@ -53,15 +55,17 @@ // TODO: possibly we could remove the volatile private volatile Segment segment; - public SegmentId(SegmentTracker tracker, long msb, long lsb, Segment segment) { + private SegmentId(SegmentTracker tracker, long msb, long lsb, + Segment segment, long creationTime) { this.tracker = tracker; this.msb = msb; this.lsb = lsb; this.segment = segment; + this.creationTime = creationTime; } public SegmentId(SegmentTracker tracker, long msb, long lsb) { - this(tracker, msb, lsb, null); + this(tracker, msb, lsb, null, System.currentTimeMillis()); } /** @@ -79,7 +83,7 @@ * @return {@code true} for a bulk segment, {@code false} otherwise */ public boolean isBulkSegmentId() { - return (lsb >>> 60) == 0xBL; + return (lsb >>> 60) == 0xBL; } public boolean equals(long msb, long lsb) { @@ -117,18 +121,22 @@ return tracker; } - //--------------------------------------------------------< Comparable >-- + public long getCreationTime() { + return creationTime; + } + + // --------------------------------------------------------< Comparable >-- @Override public int compareTo(SegmentId that) { int d = Long.valueOf(this.msb).compareTo(Long.valueOf(that.msb)); if (d == 0) { - d = Long.valueOf(this.lsb).compareTo(Long.valueOf(that.lsb)); + d = Long.valueOf(this.lsb).compareTo(Long.valueOf(that.lsb)); } return d; } - //------------------------------------------------------------< Object >-- + // ------------------------------------------------------------< Object >-- @Override public String toString() { @@ -137,7 +145,13 @@ @Override public boolean equals(Object object) { - return this == object; + if (this == object) { + return true; + } else if (object instanceof SegmentId) { + SegmentId that = (SegmentId) object; + return msb == that.msb && lsb == that.lsb; + } + return false; } @Override Index: src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentIdTable.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentIdTable.java (revision 1637657) +++ src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentIdTable.java (working copy) @@ -25,6 +25,8 @@ import java.util.Collection; import java.util.Map; +import org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy; + /** * Hash table of weak references to segment identifiers. */ @@ -143,4 +145,24 @@ return ((int) lsb) & (references.size() - 1); } + synchronized void clearSegmentIdTables(CompactionStrategy strategy) { + int size = references.size(); + boolean dirty = false; + for (int i = 0; i < size; i++) { + WeakReference reference = references.get(i); + if (reference != null) { + SegmentId id = reference.get(); + if (id != null) { + if (strategy.remove(id)) { + reference.clear(); + references.set(i, null); + dirty = true; + } + } + } + } + if (dirty) { + refresh(); + } + } } Index: src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java (revision 1637657) +++ src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java (working copy) @@ -39,6 +39,7 @@ import org.apache.jackrabbit.oak.api.CommitFailedException; import org.apache.jackrabbit.oak.api.PropertyState; import org.apache.jackrabbit.oak.api.Type; +import org.apache.jackrabbit.oak.plugins.segment.file.FileStore; import org.apache.jackrabbit.oak.plugins.segment.memory.MemoryStore; import org.apache.jackrabbit.oak.spi.blob.BlobStore; import org.apache.jackrabbit.oak.spi.commit.ChangeDispatcher; @@ -84,12 +85,20 @@ */ private final Semaphore commitSemaphore = new Semaphore(1); + private final GCHeadGuard gcGuard; + private long maximumBackoff = MILLISECONDS.convert(10, SECONDS); public SegmentNodeStore(SegmentStore store) { this.store = store; this.head = new AtomicReference(store.getHead()); this.changeDispatcher = new ChangeDispatcher(getRoot()); + this.gcGuard = new GCHeadGuard(commitSemaphore); + + // TODO find a nicer way? + if (this.store instanceof FileStore) { + ((FileStore) this.store).setGcGuard(gcGuard); + } } public SegmentNodeStore() { Index: src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java (revision 1637657) +++ src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java (working copy) @@ -18,6 +18,7 @@ import static com.google.common.base.Preconditions.checkState; import static org.apache.jackrabbit.oak.commons.PropertiesUtil.toBoolean; +import static org.apache.jackrabbit.oak.commons.PropertiesUtil.toLong; import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.registerMBean; import java.io.Closeable; @@ -32,6 +33,7 @@ import org.apache.felix.scr.annotations.ConfigurationPolicy; import org.apache.felix.scr.annotations.Deactivate; import org.apache.felix.scr.annotations.Property; +import org.apache.felix.scr.annotations.PropertyOption; import org.apache.felix.scr.annotations.Reference; import org.apache.felix.scr.annotations.ReferenceCardinality; import org.apache.felix.scr.annotations.ReferencePolicy; @@ -41,6 +43,9 @@ import org.apache.jackrabbit.oak.plugins.blob.BlobGCMBean; import org.apache.jackrabbit.oak.plugins.blob.BlobGarbageCollector; import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector; +import org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategyMBean; +import org.apache.jackrabbit.oak.plugins.segment.compaction.DefaultCompactionStrategy; +import org.apache.jackrabbit.oak.plugins.segment.compaction.DefaultCompactionStrategyMBean; import org.apache.jackrabbit.oak.plugins.segment.file.FileStore; import org.apache.jackrabbit.oak.spi.blob.BlobStore; import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore; @@ -80,11 +85,23 @@ @Property(description="Cache size (MB)", intValue=256) public static final String CACHE = "cache"; - @Property(description = "TarMK compaction paused flag", boolValue = true) + @Property(description = "TarMK compaction clone binaries flag", boolValue = true) + public static final String COMPACTION_CLONE_BINARIES = "compaction.cloneBinaries"; + + @Property(options = { @PropertyOption(name = "all", value = "all"), + @PropertyOption(name = "none", value = "none"), + @PropertyOption(name = "timestamp", value = "timestamp") }, value = "timestamp") + public static final String COMPACTION_CLEANUP = "compaction.cleanup"; + + @Property(description = "TarMK compaction strategy timestamp older (ms)", longValue = 1000 * 60 * 5) + public static final String COMPACTION_CLEANUP_TIMESTAMP = "compaction.cleanup.timestamp"; + + @Property(description = "TarMK compaction paused flag", boolValue = false) public static final String PAUSE_COMPACTION = "pauseCompaction"; @Property(description = "Flag indicating that this component will not register as a NodeStore but just as a NodeStoreProvider", boolValue = false) public static final String STANDBY = "standby"; + /** * Boolean value indicating a blobStore is to be used */ @@ -110,6 +127,7 @@ private ServiceRegistration providerRegistration; private Registration revisionGCRegistration; private Registration blobGCRegistration; + private Registration compactionStrategyRegistration; private WhiteboardExecutor executor; private boolean customBlobStore; @@ -159,16 +177,28 @@ size = System.getProperty(SIZE, "256"); } - boolean pauseCompaction = toBoolean(lookup(context, PAUSE_COMPACTION), true); + boolean pauseCompaction = toBoolean(lookup(context, PAUSE_COMPACTION), + false); + boolean cloneBinaries = toBoolean( + lookup(context, COMPACTION_CLONE_BINARIES), true); + long cleanupTs = toLong(lookup(context, COMPACTION_CLEANUP_TIMESTAMP), + -1); + String cleanup = lookup(context, COMPACTION_CLEANUP); + if (cleanup == null) { + cleanup = "none"; + } + DefaultCompactionStrategy compactionStrategy = new DefaultCompactionStrategy( + pauseCompaction, cloneBinaries, cleanup, cleanupTs); + boolean memoryMapping = "64".equals(mode); + int cacheSize = Integer.parseInt(size); if (customBlobStore) { log.info("Initializing SegmentNodeStore with BlobStore [{}]", blobStore); - store = new FileStore(blobStore, new File(directory), - Integer.parseInt(size), memoryMapping) - .setPauseCompaction(pauseCompaction); + store = new FileStore(blobStore, new File(directory), cacheSize, + memoryMapping).setCompactionStrategy(compactionStrategy); } else { - store = new FileStore(new File(directory), Integer.parseInt(size), - memoryMapping).setPauseCompaction(pauseCompaction); + store = new FileStore(new File(directory), cacheSize, memoryMapping) + .setCompactionStrategy(compactionStrategy); } delegate = new SegmentNodeStore(store); @@ -213,6 +243,12 @@ BlobGCMBean.TYPE, "Segment node store blob garbage collection"); } + compactionStrategyRegistration = registerMBean(whiteboard, + CompactionStrategyMBean.class, + new DefaultCompactionStrategyMBean(compactionStrategy), + CompactionStrategyMBean.TYPE, + "Segment node store compaction strategy settings"); + log.info("SegmentNodeStore initialized"); } @@ -264,6 +300,10 @@ blobGCRegistration.unregister(); blobGCRegistration = null; } + if (compactionStrategyRegistration != null) { + compactionStrategyRegistration.unregister(); + compactionStrategyRegistration = null; + } if (executor != null) { executor.stop(); executor = null; Index: src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTracker.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTracker.java (revision 1637657) +++ src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTracker.java (working copy) @@ -30,6 +30,7 @@ import javax.annotation.Nonnull; import org.apache.jackrabbit.oak.plugins.blob.ReferenceCollector; +import org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,7 +75,7 @@ * after compaction. */ private final AtomicReference compactionMap = - new AtomicReference(new CompactionMap(1)); + new AtomicReference(new CompactionMap(-1)); private final long cacheSize; @@ -116,9 +117,17 @@ } Segment getSegment(SegmentId id) { - Segment segment = store.readSegment(id); - setSegment(id, segment); - return segment; + try { + Segment segment = store.readSegment(id); + setSegment(id, segment); + return segment; + } catch (SegmentNotFoundException snfe) { + //TODO remove me + long delta = System.currentTimeMillis() - id.getCreationTime(); + log.error("Segment not found: {}. Creation date delta is {} ms.", + id, delta); + throw snfe; + } } void setSegment(SegmentId id, Segment segment) { @@ -157,11 +166,12 @@ } public void setCompactionMap(CompactionMap compaction) { + compaction.setPrev(compactionMap.get()); compactionMap.set(compaction); } @Nonnull - CompactionMap getCompactionMap() { + public CompactionMap getCompactionMap() { return compactionMap.get(); } @@ -209,6 +219,8 @@ } } + private SegmentIdLoaderListener listener = null; + /** * * @param msb @@ -217,7 +229,11 @@ */ public SegmentId getSegmentId(long msb, long lsb) { int index = ((int) msb) & (tables.length - 1); - return tables[index].getSegmentId(msb, lsb); + SegmentId s = tables[index].getSegmentId(msb, lsb); + if (listener != null) { + listener.loaded(s); + } + return s; } SegmentId newDataSegmentId() { @@ -234,4 +250,17 @@ return getSegmentId(msb, lsb); } + public synchronized void clearSegmentIdTables(CompactionStrategy strategy) { + for (int i = 0; i < tables.length; i++) { + tables[i].clearSegmentIdTables(strategy); + } + } + + public void setListener(SegmentIdLoaderListener listener) { + this.listener = listener; + } + + public static interface SegmentIdLoaderListener { + void loaded(SegmentId id); + } } Index: src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/CompactionStrategy.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/CompactionStrategy.java (revision 0) +++ src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/CompactionStrategy.java (revision 0) @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.segment.compaction; + +import org.apache.jackrabbit.oak.plugins.segment.CompactionMap; +import org.apache.jackrabbit.oak.plugins.segment.SegmentId; +import org.apache.jackrabbit.oak.plugins.segment.SegmentTracker.SegmentIdLoaderListener; + +public interface CompactionStrategy extends SegmentIdLoaderListener { + + void setCompactionMap(CompactionMap compaction); + + void setCompactionStart(long ms); + + boolean remove(SegmentId id); + + boolean cloneBinaries(); + + boolean paused(); + +} Index: src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/CompactionStrategyMBean.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/CompactionStrategyMBean.java (revision 0) +++ src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/CompactionStrategyMBean.java (revision 0) @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.plugins.segment.compaction; + +import javax.management.openmbean.TabularData; + +public interface CompactionStrategyMBean { + + String TYPE = "CompactionStrategy"; + + boolean isCloneBinaries(); + + void setCloneBinaries(boolean cloneBinaries); + + boolean isPausedCompaction(); + + void setPausedCompaction(boolean pausedCompaction); + + public String getCleanupStrategy(); + + void setCleanupStrategy(String cleanup); + + public long getOlderThan(); + + void setOlderThan(long olderThan); + + TabularData getCompactionMapStats(); +} Index: src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/DefaultCompactionStrategy.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/DefaultCompactionStrategy.java (revision 0) +++ src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/DefaultCompactionStrategy.java (revision 0) @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.segment.compaction; + +import static java.lang.System.currentTimeMillis; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.jackrabbit.oak.plugins.segment.CompactionMap; +import org.apache.jackrabbit.oak.plugins.segment.SegmentId; + +public class DefaultCompactionStrategy implements CompactionStrategy { + + // + // - clean 'all' *must* be used in conjunction with 'cloneBinaries' + // otherwise segments can go away (SNFE) + // pros: best compaction results + // cons: larger repo size *during* compaction (2x), high chances that a + // currently running diff (aka observation) fails with SNFE + // + // - clean 'timestamp' with 'cloneBinaries' + // pros: better compaction results, not best + // cons: larger repo size *during* compaction (2x), if the time window is + // optimal, SNFEs can be avoided + // + // - clean 'timestamp' without 'cloneBinaries' + // pros: weakest compaction results, smaller size during compaction (1x + + // data-segments-size) + // cons: if the time window is optimal, SNFEs can be avoided + + public static final CompactionStrategy DEFAULT = new DefaultCompactionStrategy( + true, false, "none", -1); + + static enum CLEANUP { + all, none, timestamp + } + + private boolean pauseCompaction; + + private boolean cloneBinaries; + + private CLEANUP cleanup; + + /** + * anything that has a lifetime bigger than this will be removed. a value of + * 0 (or very small) acts like a CLEANUP.NONE, a value of -1 (or negative) + * acts like a CLEANUP.ALL + * + */ + private long olderThan; + + private CompactionMap compactionMap; + + private long compactionStart = currentTimeMillis(); + + public DefaultCompactionStrategy(boolean pauseCompaction, + boolean cloneBinaries, String cleanup, long olderThan) { + this.pauseCompaction = pauseCompaction; + this.cloneBinaries = cloneBinaries; + this.olderThan = olderThan; + setCleanup(cleanup); + } + + @Override + public boolean remove(SegmentId id) { + switch (cleanup) { + case all: + return true; + case none: + return false; + case timestamp: + if (olderThan >= 0) { + if (loaded.contains(id)) { + return true; + } + long life = compactionStart - id.getCreationTime(); + if (life > olderThan) { + return true; + } + return false; + } + return true; + } + return false; + } + + @Override + public boolean cloneBinaries() { + return this.cloneBinaries; + } + + @Override + public boolean paused() { + return pauseCompaction; + } + + public void setPauseCompaction(boolean pauseCompaction) { + this.pauseCompaction = pauseCompaction; + } + + public void setCloneBinaries(boolean cloneBinaries) { + this.cloneBinaries = cloneBinaries; + } + + public void setCleanup(String cleanup) { + if (CLEANUP.none.toString().equals(cleanup)) { + this.cleanup = CLEANUP.none; + } else if (CLEANUP.all.toString().equals(cleanup)) { + this.cleanup = CLEANUP.all; + } else if (CLEANUP.timestamp.toString().equals(cleanup)) { + this.cleanup = CLEANUP.timestamp; + } else { + throw new IllegalArgumentException("Unknown cleanup strategy."); + } + } + + public void setOlderThan(long olderThan) { + this.olderThan = olderThan; + } + + public void setCompactionMap(CompactionMap compaction) { + this.compactionMap = compaction; + } + + String getCleanup() { + return cleanup.toString(); + } + + long getOlderThan() { + return olderThan; + } + + CompactionMap getCompactionMap() { + return this.compactionMap; + } + + @Override + public String toString() { + return "DefaultCompactionStrategy [pauseCompaction=" + pauseCompaction + + ", cloneBinaries=" + cloneBinaries + ", cleanup=" + cleanup + + ", olderThan=" + olderThan + "]"; + } + + @Override + public void setCompactionStart(long ms) { + this.compactionStart = ms; + loaded = new HashSet(); + } + + private Set loaded = new HashSet(); + + @Override + public synchronized void loaded(SegmentId id) { + // TODO only allow this for the same-thread callers + loaded.add(id); + } +} Index: src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/DefaultCompactionStrategyMBean.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/DefaultCompactionStrategyMBean.java (revision 0) +++ src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/DefaultCompactionStrategyMBean.java (revision 0) @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.plugins.segment.compaction; + +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.OpenType; +import javax.management.openmbean.SimpleType; +import javax.management.openmbean.TabularData; +import javax.management.openmbean.TabularDataSupport; +import javax.management.openmbean.TabularType; + +import org.apache.jackrabbit.oak.plugins.segment.CompactionMap; + +public class DefaultCompactionStrategyMBean implements CompactionStrategyMBean { + + private final DefaultCompactionStrategy strategy; + + public DefaultCompactionStrategyMBean(DefaultCompactionStrategy strategy) { + this.strategy = strategy; + } + + @Override + public boolean isCloneBinaries() { + return strategy.cloneBinaries(); + } + + @Override + public void setCloneBinaries(boolean cloneBinaries) { + strategy.setCloneBinaries(cloneBinaries); + } + + @Override + public boolean isPausedCompaction() { + return strategy.paused(); + } + + @Override + public void setPausedCompaction(boolean pausedCompaction) { + strategy.setPauseCompaction(pausedCompaction); + } + + @Override + public String getCleanupStrategy() { + return strategy.getCleanup(); + } + + @Override + public void setCleanupStrategy(String cleanup) { + strategy.setCleanup(cleanup); + } + + @Override + public long getOlderThan() { + return strategy.getOlderThan(); + } + + @Override + public void setOlderThan(long olderThan) { + strategy.setOlderThan(olderThan); + } + + @Override + public TabularData getCompactionMapStats() { + TabularDataSupport tds; + try { + TabularType tt = new TabularType( + DefaultCompactionStrategyMBean.class.getName(), + "Compaction Map Stats", Stats.TYPE, new String[] { "#" }); + tds = new TabularDataSupport(tt); + if (strategy.getCompactionMap() != null) { + addMap(0, strategy.getCompactionMap(), tds); + } + } catch (OpenDataException e) { + throw new IllegalStateException(e); + } + return tds; + } + + private static void addMap(int i, CompactionMap map, TabularDataSupport tds) { + tds.put(Stats.toCompositeData(i, map)); + if (map.getPrev() != null) { + addMap(i + 1, map.getPrev(), tds); + } + } + + private static class Stats { + + static CompositeDataSupport toCompositeData(int index, CompactionMap map) { + return new Stats(index, map).toCompositeData(); + } + + static final String[] FIELD_NAMES = new String[] { "#", "instance", + "items" }; + + static final String[] FIELD_DESCRIPTIONS = new String[] { "Index", + "Instance", "Items in the map" }; + + @SuppressWarnings("rawtypes") + static final OpenType[] FIELD_TYPES = new OpenType[] { + SimpleType.INTEGER, SimpleType.STRING, SimpleType.INTEGER }; + + static final CompositeType TYPE = createCompositeType(); + + static CompositeType createCompositeType() { + try { + return new CompositeType(Stats.class.getName(), + "Composite data type for the compaction map", + Stats.FIELD_NAMES, Stats.FIELD_DESCRIPTIONS, + Stats.FIELD_TYPES); + } catch (OpenDataException e) { + throw new IllegalStateException(e); + } + } + + private final int index; + private final String instance; + private final int size; + + private Stats(int index, CompactionMap map) { + this.index = index; + this.instance = map.toString(); + this.size = map.size(); + } + + private CompositeDataSupport toCompositeData() { + Object[] values = new Object[] { index, instance, size }; + try { + return new CompositeDataSupport(TYPE, FIELD_NAMES, values); + } catch (OpenDataException e) { + throw new IllegalStateException(e); + } + } + } +} Index: src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java (revision 1637657) +++ src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java (working copy) @@ -26,8 +26,6 @@ import static java.lang.String.format; import static java.util.Collections.emptyMap; import static java.util.Collections.singletonMap; -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.TimeUnit.NANOSECONDS; import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount; import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE; @@ -53,7 +51,9 @@ import com.google.common.collect.Maps; import org.apache.jackrabbit.oak.api.Blob; import org.apache.jackrabbit.oak.plugins.blob.BlobStoreBlob; +import org.apache.jackrabbit.oak.plugins.segment.CompactionMap; import org.apache.jackrabbit.oak.plugins.segment.Compactor; +import org.apache.jackrabbit.oak.plugins.segment.GCHeadGuard; import org.apache.jackrabbit.oak.plugins.segment.RecordId; import org.apache.jackrabbit.oak.plugins.segment.Segment; import org.apache.jackrabbit.oak.plugins.segment.SegmentId; @@ -63,6 +63,8 @@ import org.apache.jackrabbit.oak.plugins.segment.SegmentStore; import org.apache.jackrabbit.oak.plugins.segment.SegmentTracker; import org.apache.jackrabbit.oak.plugins.segment.SegmentWriter; +import org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy; +import org.apache.jackrabbit.oak.plugins.segment.compaction.DefaultCompactionStrategy; import org.apache.jackrabbit.oak.spi.blob.BlobStore; import org.apache.jackrabbit.oak.spi.state.NodeBuilder; import org.apache.jackrabbit.oak.spi.state.NodeState; @@ -134,17 +136,16 @@ */ private final BackgroundThread compactionThread; + private CompactionStrategy compaction = DefaultCompactionStrategy.DEFAULT; + + private volatile GCHeadGuard gcGuard = new GCHeadGuard(null); + /** * Flag to request revision cleanup during the next flush. */ private final AtomicBoolean cleanupNeeded = new AtomicBoolean(false); /** - * Flag to set the compaction on pause. - */ - private volatile boolean pauseCompaction = true; - - /** * List of old tar file generations that are waiting to be removed. They can * not be removed immediately, because they first need to be closed, and the * JVM needs to release the memory mapped file references. @@ -260,39 +261,45 @@ new Runnable() { @Override public void run() { - log.info("TarMK compaction started"); - Stopwatch watch = Stopwatch.createStarted(); - CompactionGainEstimate estimate = estimateCompactionGain(); - long gain = estimate.estimateCompactionGain(); - if (gain >= 10) { - log.info( - "Estimated compaction in {}, gain is {}% ({}/{}) or ({}/{}), so running compaction", - watch, gain, - estimate.getReachableSize(), - estimate.getTotalSize(), - humanReadableByteCount(estimate.getReachableSize()), - humanReadableByteCount(estimate.getTotalSize())); - if (!pauseCompaction) { - compact(); - } else { - log.info("TarMK compaction paused"); - } - } else { - log.info( - "Estimated compaction in {}ms, gain is {}% ({}/{}) or ({}/{}), so skipping compaction for now", - watch, gain, - estimate.getReachableSize(), - estimate.getTotalSize(), - humanReadableByteCount(estimate.getReachableSize()), - humanReadableByteCount(estimate.getTotalSize())); - } - cleanupNeeded.set(true); + maybeCompact(); } }); log.info("TarMK opened: {} (mmap={})", directory, memoryMapping); } + private void maybeCompact() { + log.info("TarMK compaction started"); + Stopwatch watch = Stopwatch.createStarted(); + tracker.setListener(compaction); + compaction.setCompactionStart(System.currentTimeMillis()); + + CompactionGainEstimate estimate = estimateCompactionGain(); + long gain = estimate.estimateCompactionGain(); + if (gain >= 10) { + log.info( + "Estimated compaction in {}, gain is {}% ({}/{}) or ({}/{}), so running compaction", + watch, gain, estimate.getReachableSize(), + estimate.getTotalSize(), + humanReadableByteCount(estimate.getReachableSize()), + humanReadableByteCount(estimate.getTotalSize())); + if (!compaction.paused()) { + compact(); + } else { + log.info("TarMK compaction paused"); + } + } else { + log.info( + "Estimated compaction in {}ms, gain is {}% ({}/{}) or ({}/{}), so skipping compaction for now", + watch, gain, estimate.getReachableSize(), + estimate.getTotalSize(), + humanReadableByteCount(estimate.getReachableSize()), + humanReadableByteCount(estimate.getTotalSize())); + } + tracker.setListener(null); + cleanupNeeded.set(true); + } + static Map> collectFiles(File directory) throws IOException { Map> dataFiles = newHashMap(); @@ -464,10 +471,10 @@ } writer.cleanup(ids); - List list = - newArrayListWithCapacity(readers.size()); + CompactionMap cm = tracker.getCompactionMap(); + List list = newArrayListWithCapacity(readers.size()); for (TarReader reader : readers) { - TarReader cleaned = reader.cleanup(ids); + TarReader cleaned = reader.cleanup(ids, cm); if (cleaned == reader) { list.add(reader); } else { @@ -493,11 +500,11 @@ * reference to them). */ public void compact() { - long start = System.nanoTime(); - log.info("TarMK compaction running"); + log.info("TarMK compaction running, strategy={}", compaction); + long start = System.currentTimeMillis(); SegmentWriter writer = new SegmentWriter(this, tracker); - Compactor compactor = new Compactor(writer); + Compactor compactor = new Compactor(writer, compaction.cloneBinaries()); SegmentNodeState before = getHead(); long existing = before.getChildNode(SegmentNodeStore.CHECKPOINTS) @@ -510,25 +517,53 @@ SegmentNodeState after = compactor.compact(EMPTY_NODE, before); writer.flush(); - while (!setHead(before, after)) { + while (!setHead(before, after, compactor)) { // Some other concurrent changes have been made. // Rebase (and compact) those changes on top of the // compacted state before retrying to set the head. SegmentNodeState head = getHead(); - after = compactor.compact(before, head); + after = compactor.compact(after, head); before = head; writer.flush(); } - tracker.setCompactionMap(compactor.getCompactionMap()); + log.info("TarMK compaction completed in {}ms", + System.currentTimeMillis() - start); + } - // Drop the SegmentWriter caches and flush any existing state - // in an attempt to prevent new references to old pre-compacted - // content. TODO: There should be a cleaner way to do this. - tracker.getWriter().dropCache(); - tracker.getWriter().flush(); + private boolean setHead(SegmentNodeState before, SegmentNodeState after, + Compactor compactor) { + if (gcGuard.tryAcquire()) { + // Need to acquire SegmentNodeStore.commitSemaphore to prevent doing setHead + // while a commit is in progress. If we don't do this, SegmentNodeStore.Commit.prepare() + // might do a rebase using before and after states from *before* compaction, which + // leads to a rebased state referring to un-compacted states. + try { + boolean success = setHead(before, after); + if (success) { + CompactionMap cm = compactor.getCompactionMap(); + tracker.setCompactionMap(cm); + compaction.setCompactionMap(cm); - log.info("TarMK compaction completed in {}ms", MILLISECONDS - .convert(System.nanoTime() - start, NANOSECONDS)); + // Drop the SegmentWriter caches and flush any existing state + // in an attempt to prevent new references to old pre-compacted + // content. TODO: There should be a cleaner way to do this. + // We need to do this inside the commitSemaphore. Doing it before + // might result will result in the new segment still referring to the + // un-compacted one. Doing it afterwards opens a race where a new head, + // which refers to the compacted head, might end up in the old, + // un-compacted segment. Both cases cause the un-compacted state to + // remain references from the compacted one. + tracker.getWriter().dropCache(); + tracker.getWriter().flush(); + tracker.clearSegmentIdTables(compaction); + } + return success; + } finally { + gcGuard.release(); + } + } else { + return false; + } } public synchronized Iterable getSegmentIds() { @@ -744,8 +779,12 @@ return emptyMap(); } - public FileStore setPauseCompaction(boolean pauseCompaction) { - this.pauseCompaction = pauseCompaction; + public FileStore setCompactionStrategy(CompactionStrategy strategy) { + this.compaction = strategy; return this; } + + public void setGcGuard(GCHeadGuard gcGuard) { + this.gcGuard = gcGuard; + } } Index: src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarReader.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarReader.java (revision 1637657) +++ src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarReader.java (working copy) @@ -46,6 +46,7 @@ import java.util.zip.CRC32; import org.apache.commons.io.FileUtils; +import org.apache.jackrabbit.oak.plugins.segment.CompactionMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -616,7 +617,11 @@ * @return this (if the file is kept as is), or the new generation file, or * null if the file is fully garbage */ - synchronized TarReader cleanup(Set referencedIds) throws IOException { + synchronized TarReader cleanup(Set referencedIds, CompactionMap cm) throws IOException { + if (referencedIds.isEmpty()) { + return null; + } + Map> graph = null; if (this.graph != null) { graph = parseGraph(); @@ -643,16 +648,27 @@ // this segment is not referenced anywhere sorted[i] = null; } else { - size += getEntrySize(entry.size()); - count += 1; - if (isDataSegmentId(entry.lsb())) { + size += getEntrySize(entry.size()); + count += 1; + // this is a referenced data segment, so follow the graph if (graph != null) { List refids = graph.get( new UUID(entry.msb(), entry.lsb())); if (refids != null) { - referencedIds.addAll(refids); + for (UUID r : refids) { + if (isDataSegmentId(r.getLeastSignificantBits())) { + referencedIds.add(r); + } else { + if (cm != null && cm.wasCompacted(id)) { + // skip bulk compacted segment + // references + } else { + referencedIds.add(r); + } + } + } } } else { // a pre-compiled graph is not available, so read the @@ -664,11 +680,27 @@ int refcount = segment.get(pos + REF_COUNT_OFFSET) & 0xff; int refend = pos + 16 * (refcount + 1); for (int refpos = pos + 16; refpos < refend; refpos += 16) { - referencedIds.add(new UUID( - segment.getLong(refpos), - segment.getLong(refpos + 8))); + UUID r = new UUID(segment.getLong(refpos), + segment.getLong(refpos + 8)); + if (isDataSegmentId(r.getLeastSignificantBits())) { + referencedIds.add(r); + } else { + if (cm != null && cm.wasCompacted(id)) { + // skip bulk compacted segment references + } else { + referencedIds.add(r); + } + } } } + } else { + // bulk segments compaction check + if (cm != null && cm.wasCompacted(id)) { + sorted[i] = null; + } else { + size += getEntrySize(entry.size()); + count += 1; + } } } } Index: src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/CompactionAndCleanupIT.java =================================================================== --- src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/CompactionAndCleanupIT.java (revision 0) +++ src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/CompactionAndCleanupIT.java (revision 0) @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.plugins.segment.file; + +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +import junit.framework.Assert; + +import org.apache.commons.io.FileUtils; +import org.apache.jackrabbit.oak.api.Blob; +import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState; +import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore; +import org.apache.jackrabbit.oak.plugins.segment.SegmentNotFoundException; +import org.apache.jackrabbit.oak.plugins.segment.compaction.DefaultCompactionStrategy; +import org.apache.jackrabbit.oak.spi.commit.CommitInfo; +import org.apache.jackrabbit.oak.spi.commit.EmptyHook; +import org.apache.jackrabbit.oak.spi.state.NodeBuilder; +import org.apache.jackrabbit.oak.spi.state.NodeStore; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class CompactionAndCleanupIT { + + private File directory; + + @Before + public void setUp() throws IOException { + directory = File.createTempFile("FileStoreTest", "dir", new File( + "target")); + directory.delete(); + directory.mkdir(); + } + + @After + public void cleanDir() throws IOException { + FileUtils.deleteDirectory(directory); + } + + private static Blob createBlob(NodeStore nodeStore, int size) + throws IOException { + byte[] data = new byte[size]; + new Random().nextBytes(data); + return nodeStore.createBlob(new ByteArrayInputStream(data)); + } + + @Test + public void testFastCompareAgainstBaseStateAll() throws Exception { + fastCompareAgainstBaseState("all", -1, true); + } + + @Test + public void testFastCompareAgainstBaseStateTimestamp() throws Exception { + fastCompareAgainstBaseState("timestamp", 5, false); + } + + public void fastCompareAgainstBaseState(String strategy, long olderThan, + boolean cloneBinaries) throws Exception { + + final int MB = 1024 * 1024; + final int blobSize = 5 * MB; + + // As it turns out, you can use the 'all' + // strategy only if you clone binaries, + // otherwise, you copy by reference and the old binary segments will go + // away on cleanup => segmentnotfoundexceptions + + DefaultCompactionStrategy custom = new DefaultCompactionStrategy(false, + cloneBinaries, strategy, olderThan); + + FileStore fileStore = new FileStore(directory, 1, 1, false); + SegmentNodeStore nodeStore = new SegmentNodeStore(fileStore); + fileStore.setCompactionStrategy(custom); + + NodeBuilder builder = nodeStore.getRoot().builder(); + NodeBuilder c = builder.child("content"); + c.setProperty("a1", createBlob(nodeStore, blobSize)); + c.setProperty("b", "foo"); + + nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); + + SegmentNodeState head = (SegmentNodeState) nodeStore.getRoot() + .getChildNode("content"); + TimeUnit.SECONDS.sleep(2); + + for (int i = 0; i < 250; i++) { + // System.out.println("run #" + i); + fileStore.compact(); + fileStore.cleanup(); + + SegmentNodeState compact = (SegmentNodeState) nodeStore.getRoot() + .getChildNode("content"); + assertTrue(!compact.getRecordId().equals(head.getRecordId())); + try { + // assertTrue(head.wasCompactedTo(compact)); + // NPE if the diff is not shortcut via the compaction map + assertTrue(compact.compareAgainstBaseState(head, null)); + } catch (SegmentNotFoundException snfe) { + snfe.printStackTrace(); + Assert.fail("Failed run #" + i + ": " + snfe.getMessage()); + } + if (i % 5 == 0) { + head = compact; + } + } + } + +} Index: src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/CompactionAndCleanupTest.java =================================================================== --- src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/CompactionAndCleanupTest.java (revision 1637657) +++ src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/CompactionAndCleanupTest.java (working copy) @@ -32,17 +32,18 @@ import java.util.Random; import java.util.Set; import java.util.UUID; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.io.FileUtils; import org.apache.jackrabbit.oak.api.Blob; import org.apache.jackrabbit.oak.api.CommitFailedException; import org.apache.jackrabbit.oak.api.PropertyState; -import org.apache.jackrabbit.oak.plugins.segment.Compactor; import org.apache.jackrabbit.oak.plugins.segment.SegmentId; import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState; import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore; import org.apache.jackrabbit.oak.plugins.segment.SegmentPropertyState; +import org.apache.jackrabbit.oak.plugins.segment.compaction.DefaultCompactionStrategy; import org.apache.jackrabbit.oak.spi.commit.CommitInfo; import org.apache.jackrabbit.oak.spi.commit.EmptyHook; import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry; @@ -51,7 +52,6 @@ import org.apache.jackrabbit.oak.spi.state.NodeStore; import org.junit.After; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; public class CompactionAndCleanupTest { @@ -67,68 +67,81 @@ } @Test - @Ignore("OAK-2045") - public void compactionAndWeakReferenceMagic() throws Exception{ + public void compactionAndWeakReferenceMagic() throws Exception { final int MB = 1024 * 1024; final int blobSize = 5 * MB; + // really long time span + DefaultCompactionStrategy custom = new DefaultCompactionStrategy(false, + false, "timestamp", TimeUnit.HOURS.toMillis(1)); + FileStore fileStore = new FileStore(directory, 1, 1, false); SegmentNodeStore nodeStore = new SegmentNodeStore(fileStore); + fileStore.setCompactionStrategy(custom); - //1. Create a property with 5 MB blob + // 1. Create a property with 5 MB blob NodeBuilder builder = nodeStore.getRoot().builder(); builder.setProperty("a1", createBlob(nodeStore, blobSize)); builder.setProperty("b", "foo"); - //Keep a reference to this nodeState to simulate long - //running session - NodeState ns1 = nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); - - System.out.printf("File store pre removal %d%n", mb(fileStore.size())); - assertEquals(mb(fileStore.size()), mb(blobSize)); + // Keep a reference to this nodeState to simulate long + // running session + @SuppressWarnings("unused") + NodeState ns1 = nodeStore.merge(builder, EmptyHook.INSTANCE, + CommitInfo.EMPTY); + // System.out.printf("File store pre removal %d%n", mb(fileStore.size())); + assertEquals(mb(blobSize), mb(fileStore.size())); - //2. Now remove the property + // 2. Now remove the property builder = nodeStore.getRoot().builder(); builder.removeProperty("a1"); nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); - //Size remains same - System.out.printf("File store pre compaction %d%n", mb(fileStore.size())); - assertEquals(mb(fileStore.size()), mb(blobSize)); + // Size remains same + // System.out.printf("File store pre compaction %d%n", mb(fileStore.size())); + assertEquals(mb(blobSize), mb(fileStore.size())); - //3. Compact + // 3. Compact + custom.setCompactionStart(System.currentTimeMillis()); fileStore.compact(); + fileStore.cleanup(); - //Size still remains same - System.out.printf("File store post compaction %d%n", mb(fileStore.size())); - assertEquals(mb(fileStore.size()), mb(blobSize)); + // Size still remains same + // System.out.printf("File store post compaction %d%n", mb(fileStore.size())); + assertEquals(mb(blobSize), mb(fileStore.size())); - //4. Add some more property to flush the current TarWriter + // 4. Add some more property to flush the current TarWriter builder = nodeStore.getRoot().builder(); builder.setProperty("a2", createBlob(nodeStore, blobSize)); nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); - //Size is double - System.out.printf("File store pre cleanup %d%n", mb(fileStore.size())); - assertEquals(mb(fileStore.size()), 2 * mb(blobSize)); + // Size is double + // System.out.printf("File store pre cleanup %d%n", mb(fileStore.size())); + assertEquals(2 * mb(blobSize), mb(fileStore.size())); - //5. Cleanup - cleanup(fileStore); + // 5. Cleanup + custom.setCompactionStart(System.currentTimeMillis()); + fileStore.compact(); + fileStore.cleanup(); - //Size is still double. Deleted space not reclaimed - System.out.printf("File store post cleanup %d%n", mb(fileStore.size())); - assertEquals(mb(fileStore.size()), 2 * mb(blobSize)); + // Size is still double. Deleted space not reclaimed + // System.out.printf("File store post cleanup %d%n", mb(fileStore.size())); + assertEquals(mb(blobSize), mb(fileStore.size())); - //6. Null out any hard reference + // 6. Null out any hard reference ns1 = null; builder = null; - cleanup(fileStore); + // refresh the ts ref + custom.setOlderThan(0); - //Size should not come back to 5 and deleted data - //space reclaimed - System.out.printf("File store post cleanup and nullification %d%n", mb(fileStore.size())); - assertEquals(mb(fileStore.size()), mb(blobSize)); + fileStore.compact(); + fileStore.cleanup(); + + // Size should not come back to 5 and deleted data + // space reclaimed + // System.out.printf("File store post cleanup and nullification %d%n", mb(fileStore.size())); + assertEquals(mb(blobSize), mb(fileStore.size())); } @After @@ -136,20 +149,14 @@ FileUtils.deleteDirectory(directory); } - private static void cleanup(FileStore fileStore) throws IOException { - fileStore.getTracker().setCompactionMap(new Compactor(null).getCompactionMap()); - fileStore.getTracker().getWriter().dropCache(); - - fileStore.cleanup(); - } - - private static Blob createBlob(NodeStore nodeStore, int size) throws IOException { + private static Blob createBlob(NodeStore nodeStore, int size) + throws IOException { byte[] data = new byte[size]; new Random().nextBytes(data); return nodeStore.createBlob(new ByteArrayInputStream(data)); } - private static long mb(long size){ + private static long mb(long size) { return size / (1024 * 1024); } @@ -193,7 +200,6 @@ } } - @Ignore("OAK-2192") // FIXME OAK-2192 @Test public void testMixedSegments() throws Exception { FileStore store = new FileStore(directory, 2, false); @@ -276,6 +282,4 @@ builder.setProperty("property-" + UUID.randomUUID().toString(), "value-" + UUID.randomUUID().toString()); } } - - }