Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/CompactionGainEstimate.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/CompactionGainEstimate.java (date 1444201361000) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/CompactionGainEstimate.java (date 1444221828000) @@ -21,6 +21,7 @@ import java.io.File; import java.util.UUID; +import com.google.common.base.Supplier; import com.google.common.hash.BloomFilter; import com.google.common.hash.Funnel; import com.google.common.hash.PrimitiveSink; @@ -49,13 +50,22 @@ private long reachableSize = 0; - CompactionGainEstimate(SegmentNodeState node, int estimatedBulkCount) { + /** + * Create a new instance of gain estimator. The estimation process can be stopped + * by switching the supplier {@code stop} to {@code true}, in which case the returned + * estimates are undefined. + * + * @param node root node state + * @param estimatedBulkCount + * @param stop stop signal + */ + CompactionGainEstimate(SegmentNodeState node, int estimatedBulkCount, Supplier stop) { uuids = BloomFilter.create(UUID_FUNNEL, estimatedBulkCount); - collectReferencedSegments(node, new RecordIdSet()); + collectReferencedSegments(node, new RecordIdSet(), stop); } - private void collectReferencedSegments(SegmentNodeState node, RecordIdSet visited) { - if (visited.addIfNotPresent(node.getRecordId())) { + private void collectReferencedSegments(SegmentNodeState node, RecordIdSet visited, Supplier stop) { + if (!stop.get() && visited.addIfNotPresent(node.getRecordId())) { collectUUID(node.getRecordId().getSegmentId()); for (PropertyState property : node.getProperties()) { if (property instanceof SegmentPropertyState) { @@ -75,7 +85,7 @@ } for (ChildNodeEntry child : node.getChildNodeEntries()) { collectReferencedSegments((SegmentNodeState) child.getNodeState(), - visited); + visited, stop); } } } Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java (date 1444201361000) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java (date 1444221828000) @@ -16,6 +16,47 @@ */ package org.apache.jackrabbit.oak.plugins.segment.file; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.collect.Lists.newArrayList; +import static com.google.common.collect.Lists.newArrayListWithCapacity; +import static com.google.common.collect.Lists.newLinkedList; +import static com.google.common.collect.Maps.newHashMap; +import static com.google.common.collect.Maps.newLinkedHashMap; +import static com.google.common.collect.Sets.newHashSet; +import static java.lang.String.format; +import static java.util.Collections.emptyMap; +import static java.util.Collections.singletonMap; +import static java.util.concurrent.TimeUnit.MINUTES; +import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount; +import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE; +import static org.apache.jackrabbit.oak.plugins.segment.CompactionMap.sum; +import static org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.NO_COMPACTION; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileLock; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import javax.annotation.Nonnull; + import com.google.common.base.Stopwatch; import com.google.common.base.Supplier; import org.apache.jackrabbit.oak.api.Blob; @@ -41,46 +82,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nonnull; -import java.io.Closeable; -import java.io.File; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.nio.ByteBuffer; -import java.nio.channels.FileLock; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; -import static com.google.common.collect.Lists.newArrayList; -import static com.google.common.collect.Lists.newArrayListWithCapacity; -import static com.google.common.collect.Lists.newLinkedList; -import static com.google.common.collect.Maps.newHashMap; -import static com.google.common.collect.Maps.newLinkedHashMap; -import static com.google.common.collect.Sets.newHashSet; -import static java.lang.String.format; -import static java.util.Collections.emptyMap; -import static java.util.Collections.singletonMap; -import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount; -import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE; -import static org.apache.jackrabbit.oak.plugins.segment.CompactionMap.sum; -import static org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.NO_COMPACTION; - /** * The storage implementation for tar files. */ @@ -194,6 +195,11 @@ private final AtomicBoolean sufficientDiskSpace; /** + * Flag signalling shutdown of the file store + */ + private volatile boolean shutdown; + + /** * Create a new instance of a {@link Builder} for a file store. * @param directory directory where the tar files are stored * @return a new {@link Builder} instance. @@ -456,7 +462,7 @@ } if (!readonly) { - this.flushThread = new BackgroundThread( + flushThread = new BackgroundThread( "TarMK flush thread [" + directory + "]", 5000, // 5s interval new Runnable() { @Override @@ -469,7 +475,7 @@ } } }); - this.compactionThread = new BackgroundThread( + compactionThread = new BackgroundThread( "TarMK compaction thread [" + directory + "]", -1, new Runnable() { @Override @@ -478,7 +484,8 @@ } }); - diskSpaceThread = new BackgroundThread("TarMK disk space check [" + directory + "]", TimeUnit.MINUTES.toMillis(1), new Runnable() { + diskSpaceThread = new BackgroundThread( + "TarMK disk space check [" + directory + "]", MINUTES.toMillis(1), new Runnable() { @Override public void run() { @@ -489,8 +496,8 @@ approximateSize = new AtomicLong(size()); } else { - this.flushThread = null; - this.compactionThread = null; + flushThread = null; + compactionThread = null; diskSpaceThread = null; approximateSize = null; } @@ -506,7 +513,7 @@ } public boolean maybeCompact(boolean cleanup) { - log.info("TarMK compaction started"); + gcMonitor.info("TarMK compaction started"); Runtime runtime = Runtime.getRuntime(); long avail = runtime.totalMemory() - runtime.freeMemory(); @@ -538,7 +545,13 @@ byte gainThreshold = compactionStrategy.getGainThreshold(); boolean runCompaction = true; if (gainThreshold > 0) { - CompactionGainEstimate estimate = estimateCompactionGain(); + Supplier shutdown = newShutdownSignal(); + CompactionGainEstimate estimate = estimateCompactionGain(shutdown); + if (shutdown.get()) { + gcMonitor.info("Compaction estimation interrupted. Skipping compaction."); + return false; + } + long gain = estimate.estimateCompactionGain(offset); runCompaction = gain >= gainThreshold; if (runCompaction) { @@ -672,14 +685,22 @@ return count; } - CompactionGainEstimate estimateCompactionGain() { - CompactionGainEstimate estimate = new CompactionGainEstimate(getHead(), - count()); + /** + * Estimated compaction gain. The result will be undefined if stopped through + * the passed {@code stop} signal. + * @param stop signal for stopping the estimation process. + * @return compaction gain estimate + */ + CompactionGainEstimate estimateCompactionGain(Supplier stop) { + CompactionGainEstimate estimate = new CompactionGainEstimate(getHead(), count(), stop); synchronized (this) { for (TarReader reader : readers) { reader.accept(estimate); + if (stop.get()) { + break; - } - } + } + } + } return estimate; } @@ -763,7 +784,7 @@ } writer.collectReferences(referencedIds); for (TarReader reader : readers) { - cleaned.put(reader, null); + cleaned.put(reader, reader); } } @@ -773,9 +794,12 @@ LinkedList toRemove = newLinkedList(); Set cleanedIds = newHashSet(); for (TarReader reader : cleaned.keySet()) { - TarReader newReader = reader.cleanup(referencedIds, cm, cleanedIds); - cleaned.put(reader, newReader); + cleaned.put(reader, reader.cleanup(referencedIds, cm, cleanedIds)); + if (shutdown) { + gcMonitor.info("TarMK revision cleanup interrupted"); + break; - } + } + } List oldReaders = newArrayList(); synchronized (this) { @@ -832,32 +856,59 @@ * space was considered insufficient at least once during compaction (or if * the space was never sufficient to begin with), compaction is considered * canceled. + * Furthermore when the file store is shutting down, compaction is considered + * canceled. * * @return a flag indicating if compaction should be canceled. */ private Supplier newCancelCompactionCondition() { return new Supplier() { - private boolean canceled = false; + private boolean outOfDiskSpace; + private boolean shutdown; @Override public Boolean get() { - // The canceled flag can only transition from false (its initial - // value), to true. Once compaction is considered canceled, - // there should be no way to go back. - + // The outOfDiskSpace and shutdown flags can only transition from false (their initial + // values), to true. Once true, there should be no way to go back. if (!sufficientDiskSpace.get()) { - canceled = true; + outOfDiskSpace = true; } + if (FileStore.this.shutdown) { + this.shutdown = true; + } - return canceled; + return shutdown || outOfDiskSpace; } + @Override + public String toString() { + if (outOfDiskSpace) { + return "Not enough disk space available"; + } else if (shutdown) { + return "FileStore shutdown request received"; + } else { + return ""; + } + } }; } /** + * Returns a signal indication the file store shutting down. + * @return a shutdown signal + */ + private Supplier newShutdownSignal() { + return new Supplier() { + @Override + public Boolean get() { + return shutdown; + } + }; + } + + /** * Copy every referenced record in data (non-bulk) segments. Bulk segments * are fully kept (they are only removed in cleanup, if there is no * reference to them). @@ -881,7 +932,7 @@ SegmentNodeState after = compactor.compact(EMPTY_NODE, before, EMPTY_NODE); if (compactionCanceled.get()) { - gcMonitor.warn("TarMK compaction was canceled, not enough disk space available."); + gcMonitor.warn("TarMK compaction was canceled: {}", compactionCanceled); return; } @@ -900,7 +951,7 @@ after = compactor.compact(before, head, after); if (compactionCanceled.get()) { - gcMonitor.warn("TarMK compaction was canceled, not enough disk space available."); + gcMonitor.warn("TarMK compaction was canceled: {}", compactionCanceled); return; } @@ -973,6 +1024,9 @@ @Override public void close() { + // Flag the store as shutting / shut down + shutdown = true; + // avoid deadlocks by closing (and joining) the background // threads before acquiring the synchronization lock closeAndLogOnFail(compactionThread); Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/CompactionEstimatorTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/CompactionEstimatorTest.java (date 1444201361000) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/CompactionEstimatorTest.java (date 1444221828000) @@ -27,6 +27,7 @@ import java.io.IOException; import java.util.Random; +import com.google.common.base.Suppliers; import org.apache.jackrabbit.oak.api.Blob; import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore; import org.apache.jackrabbit.oak.spi.commit.CommitInfo; @@ -87,7 +88,7 @@ fileStore.flush(); try { // should be at 66% - assertTrue(fileStore.estimateCompactionGain() + assertTrue(fileStore.estimateCompactionGain(Suppliers.ofInstance(false)) .estimateCompactionGain(0) > 60); } finally { fileStore.close();