Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java (revision 1706189) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java (working copy) @@ -16,18 +16,6 @@ */ package org.apache.jackrabbit.oak.plugins.segment; -import static com.google.common.collect.Lists.newArrayList; -import static com.google.common.collect.Maps.newHashMap; -import static org.apache.jackrabbit.oak.api.Type.BINARIES; -import static org.apache.jackrabbit.oak.api.Type.BINARY; -import static org.apache.jackrabbit.oak.commons.PathUtils.concat; - -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - import com.google.common.base.Predicate; import com.google.common.base.Predicates; import com.google.common.hash.Hashing; @@ -47,6 +35,18 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static com.google.common.collect.Lists.newArrayList; +import static com.google.common.collect.Maps.newHashMap; +import static org.apache.jackrabbit.oak.api.Type.BINARIES; +import static org.apache.jackrabbit.oak.api.Type.BINARY; +import static org.apache.jackrabbit.oak.commons.PathUtils.concat; + /** * Tool for compacting segments. */ @@ -88,6 +88,12 @@ */ private final boolean cloneBinaries; + /** + * This flag is true if the thread running the compaction process received + * an interrupt. + */ + private boolean interrupted; + public Compactor(SegmentWriter writer) { this.writer = writer; this.map = new InMemoryCompactionMap(writer.getTracker()); @@ -109,11 +115,27 @@ protected SegmentNodeBuilder process(NodeState before, NodeState after, NodeState onto) { SegmentNodeBuilder builder = new SegmentNodeBuilder(writer.writeNode(onto), writer); - after.compareAgainstBaseState(before, new CompactDiff(builder)); + InterruptibleDiff diff = new InterruptibleDiff(new CompactDiff(builder)); + interrupted = false; + after.compareAgainstBaseState(before, diff); + interrupted = diff.wasInterrupted(); return builder; } /** + * If {@code true}, the compaction process was interrupted. In this case, + * the {@code SegmentNodeState} returned by {@link #compact(NodeState, + * NodeState)} and {@link #compact(NodeState, NodeState, NodeState)} + * contains only the changes that were compacted before the compaction + * process was interrupted. + * + * @return {@code true} if the compaction process was interrupted. + */ + public boolean wasInterrupted() { + return interrupted; + } + + /** * Compact the differences between a {@code before} and a {@code after} * on top of the {@code before} state. *
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/InterruptibleDiff.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/InterruptibleDiff.java (revision 0)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/InterruptibleDiff.java (working copy)
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment;
+
+import org.apache.jackrabbit.oak.api.PropertyState;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.spi.state.NodeStateDiff;
+
+class InterruptibleDiff implements NodeStateDiff {
+
+ private final NodeStateDiff delegate;
+
+ private boolean interrupted = false;
+
+ public InterruptibleDiff(NodeStateDiff delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public final boolean propertyAdded(PropertyState after) {
+ if (isInterrupted()) {
+ return interrupted();
+ } else {
+ return delegate.propertyAdded(after);
+ }
+ }
+
+ @Override
+ public final boolean propertyChanged(PropertyState before, PropertyState after) {
+ if (isInterrupted()) {
+ return interrupted();
+ } else {
+ return delegate.propertyChanged(before, after);
+ }
+ }
+
+ @Override
+ public final boolean propertyDeleted(PropertyState before) {
+ if (isInterrupted()) {
+ return interrupted();
+ } else {
+ return delegate.propertyDeleted(before);
+ }
+ }
+
+ @Override
+ public final boolean childNodeAdded(String name, NodeState after) {
+ if (isInterrupted()) {
+ return interrupted();
+ } else {
+ return delegate.childNodeAdded(name, after);
+ }
+ }
+
+ @Override
+ public final boolean childNodeChanged(String name, NodeState before, NodeState after) {
+ if (isInterrupted()) {
+ return interrupted();
+ } else {
+ return delegate.childNodeChanged(name, before, after);
+ }
+ }
+
+ @Override
+ public final boolean childNodeDeleted(String name, NodeState before) {
+ if (isInterrupted()) {
+ return interrupted();
+ } else {
+ return delegate.childNodeDeleted(name, before);
+ }
+ }
+
+ public boolean wasInterrupted() {
+ return interrupted;
+ }
+
+ private boolean isInterrupted() {
+ return Thread.currentThread().isInterrupted();
+ }
+
+ private boolean interrupted() {
+ interrupted = true;
+ return false;
+ }
+
+}
Property changes on: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/InterruptibleDiff.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/CompactionStrategy.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/CompactionStrategy.java (revision 1706189)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/CompactionStrategy.java (working copy)
@@ -293,4 +293,8 @@
this.offlineCompaction = offlineCompaction;
}
+ public boolean isDiskSpaceSufficient(long repositoryDiskSpace, long availableDiskSpace) {
+ return true;
+ }
+
}
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/BackgroundThread.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/BackgroundThread.java (revision 1706189)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/BackgroundThread.java (working copy)
@@ -16,13 +16,13 @@
*/
package org.apache.jackrabbit.oak.plugins.segment.file;
-import static java.lang.System.currentTimeMillis;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.util.Date;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static java.lang.System.currentTimeMillis;
/**
* A small wrapper around the Thread class that periodically calls a runnable.
@@ -62,13 +62,57 @@
@Override
public void run() {
- try {
- while (waitUntilNextIteration()) {
+ boolean alive = true;
+
+ boolean interrupted = false;
+
+ while (alive) {
+
+ // Wait until the next iteration occurs. The waiting can be
+ // interrupted. If this happens, we will restore the interrupted
+ // flag later for the Runnable to catch.
+
+ while (true) {
+ try {
+ onBeforeWait();
+ alive = waitUntilNextIteration();
+ break;
+ } catch (InterruptedException e) {
+ onInterrupt();
+ interrupted = true;
+ }
+ }
+
+ // At this point, the thread was notified or the waiting time
+ // expired. Additionally, the thread might have been interrupted.
+ // Interrupts don't terminate the thread, but the interrupted status
+ // may be checked by the Runnable to promptly cancel its activities.
+ // Termination of this thread is possible by calling the close()
+ // method.
+
+ if (alive) {
setName(name + ", active since " + new Date()
+ ", previous max duration " + maxDuration + "ms");
long start = currentTimeMillis();
- super.run();
+
+ // If the thread was interrupted, restore the interrupted flag
+ // of the thread, so the Runnable can act accordingly.
+
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+
+ // Invoke the runnable and, afterwards, clear the interrupted
+ // flag of the thread, so we can restart from a clean slate
+ // when waiting for the next iteration.
+
+ try {
+ super.run();
+ } finally {
+ interrupted = Thread.interrupted();
+ }
+
long duration = currentTimeMillis() - start;
iterations++;
@@ -80,9 +124,6 @@
+ ", avg " + (sumDuration / iterations) + "ms"
+ ", max " + maxDuration + "ms");
}
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- log.error(name + " interrupted", e);
}
}
@@ -90,6 +131,14 @@
trigger(false);
}
+ protected void onBeforeWait() {
+ // Callback for testing purposes.
+ }
+
+ protected void onInterrupt() {
+ // Callback for testing purposes.
+ }
+
@Override
public void close() {
try {
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java (revision 1706189)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java (working copy)
@@ -16,47 +16,8 @@
*/
package org.apache.jackrabbit.oak.plugins.segment.file;
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-import static com.google.common.collect.Lists.newArrayList;
-import static com.google.common.collect.Lists.newArrayListWithCapacity;
-import static com.google.common.collect.Lists.newLinkedList;
-import static com.google.common.collect.Maps.newHashMap;
-import static com.google.common.collect.Sets.newHashSet;
-import static java.lang.String.format;
-import static java.util.Collections.emptyMap;
-import static java.util.Collections.singletonMap;
-import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount;
-import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
-import static org.apache.jackrabbit.oak.plugins.segment.CompactionMap.sum;
-import static org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.NO_COMPACTION;
-
-import java.io.Closeable;
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileLock;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import javax.annotation.Nonnull;
-
import com.google.common.base.Stopwatch;
import com.google.common.collect.Maps;
-
import org.apache.jackrabbit.oak.api.Blob;
import org.apache.jackrabbit.oak.plugins.blob.BlobStoreBlob;
import org.apache.jackrabbit.oak.plugins.segment.CompactionMap;
@@ -80,6 +41,43 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nonnull;
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileLock;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.collect.Lists.newArrayList;
+import static com.google.common.collect.Lists.newArrayListWithCapacity;
+import static com.google.common.collect.Lists.newLinkedList;
+import static com.google.common.collect.Maps.newHashMap;
+import static com.google.common.collect.Sets.newHashSet;
+import static java.lang.String.format;
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.singletonMap;
+import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount;
+import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
+import static org.apache.jackrabbit.oak.plugins.segment.CompactionMap.sum;
+import static org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.NO_COMPACTION;
+
/**
* The storage implementation for tar files.
*/
@@ -149,6 +147,13 @@
*/
private final BackgroundThread compactionThread;
+ /**
+ * This background thread checks the available disk space, and compares it
+ * with the size of the repository. If the available disk space is not
+ * enough for compaction to run, the compaction thread is interrupted.
+ */
+ private final BackgroundThread diskSpaceCheckThread;
+
private CompactionStrategy compactionStrategy = NO_COMPACTION;
/**
@@ -457,9 +462,19 @@
maybeCompact(true);
}
});
+
+ this.diskSpaceCheckThread = new BackgroundThread("TarMK disk space check thread [" + directory + "]", 30 * 1000, new Runnable() {
+
+ @Override
+ public void run() {
+ checkDiskSpace();
+ }
+
+ });
} else {
this.flushThread = null;
this.compactionThread = null;
+ this.diskSpaceCheckThread = null;
}
if (readonly) {
@@ -492,6 +507,21 @@
return false;
}
+ long repositoryDiskSpace = size();
+ long avaiableDiskSpace = directory.getFreeSpace();
+
+ if (!compactionStrategy.isDiskSpaceSufficient(repositoryDiskSpace, avaiableDiskSpace)) {
+ gcMonitor.skipped("Not enough disk space available ({}), if compared with current repository size ({}), skipping compaction",
+ humanReadableByteCount(avaiableDiskSpace),
+ humanReadableByteCount(repositoryDiskSpace));
+
+ if (cleanup) {
+ cleanupNeeded.set(true);
+ }
+
+ return false;
+ }
+
Stopwatch watch = Stopwatch.createStarted();
compactionStrategy.setCompactionStart(System.currentTimeMillis());
boolean compacted = false;
@@ -789,6 +819,11 @@
SegmentNodeState after = compactor.compact(EMPTY_NODE, before, EMPTY_NODE);
+ if (compactor.wasInterrupted()) {
+ gcMonitor.warn("TarMK compaction was interrupted, transient changes will be discarded at next cleanup");
+ return;
+ }
+
Callable