Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CancelableDiff.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CancelableDiff.java (revision 0)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CancelableDiff.java (working copy)
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment;
+
+import org.apache.jackrabbit.oak.api.PropertyState;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.spi.state.NodeStateDiff;
+
+class CancelableDiff implements NodeStateDiff {
+
+ private final NodeStateDiff delegate;
+
+ private volatile boolean cancel = false;
+
+ public CancelableDiff(NodeStateDiff delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public final boolean propertyAdded(PropertyState after) {
+ if (wasCanceled()) {
+ return false;
+ } else {
+ return delegate.propertyAdded(after);
+ }
+ }
+
+ @Override
+ public final boolean propertyChanged(PropertyState before, PropertyState after) {
+ if (wasCanceled()) {
+ return false;
+ } else {
+ return delegate.propertyChanged(before, after);
+ }
+ }
+
+ @Override
+ public final boolean propertyDeleted(PropertyState before) {
+ if (wasCanceled()) {
+ return false;
+ } else {
+ return delegate.propertyDeleted(before);
+ }
+ }
+
+ @Override
+ public final boolean childNodeAdded(String name, NodeState after) {
+ if (wasCanceled()) {
+ return false;
+ } else {
+ return delegate.childNodeAdded(name, after);
+ }
+ }
+
+ @Override
+ public final boolean childNodeChanged(String name, NodeState before, NodeState after) {
+ if (wasCanceled()) {
+ return false;
+ } else {
+ return delegate.childNodeChanged(name, before, after);
+ }
+ }
+
+ @Override
+ public final boolean childNodeDeleted(String name, NodeState before) {
+ if (wasCanceled()) {
+ return false;
+ } else {
+ return delegate.childNodeDeleted(name, before);
+ }
+ }
+
+ public boolean wasCanceled() {
+ return cancel;
+ }
+
+ public void cancel() {
+ cancel = true;
+ }
+
+}
Property changes on: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CancelableDiff.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java (revision 1706358)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java (working copy)
@@ -88,6 +88,12 @@
*/
private final boolean cloneBinaries;
+ /**
+ * A reference to the cancelable diff that is currently performing a
+ * compaction operation.
+ */
+ private volatile CancelableDiff diff;
+
public Compactor(SegmentWriter writer) {
this.writer = writer;
this.map = new InMemoryCompactionMap(writer.getTracker());
@@ -109,11 +115,50 @@
protected SegmentNodeBuilder process(NodeState before, NodeState after, NodeState onto) {
SegmentNodeBuilder builder = new SegmentNodeBuilder(writer.writeNode(onto), writer);
- after.compareAgainstBaseState(before, new CompactDiff(builder));
+ diff = new CancelableDiff(new CompactDiff(builder));
+ onBeforeProcess();
+ after.compareAgainstBaseState(before, diff);
return builder;
}
+ protected void onBeforeProcess() {
+ // Callback for testing purposes.
+ }
+
/**
+ * If {@code true}, the compaction process was canceled. In this case, the
+ * {@code SegmentNodeState} returned by {@link #compact(NodeState,
+ * NodeState)} and {@link #compact(NodeState, NodeState, NodeState)}
+ * contains only the changes that were compacted before the compaction
+ * process was canceled.
+ *
+ * @return {@code true} if the compaction process was canceled.
+ */
+ public boolean wasCanceled() {
+ CancelableDiff local = diff;
+
+ if (local == null) {
+ return false;
+ }
+
+ return local.wasCanceled();
+ }
+
+ /**
+ * Cancel the currently running compaction operation. If compaction is not
+ * running yet, this method is a no-op.
+ */
+ public void cancel() {
+ CancelableDiff local = diff;
+
+ if (local == null) {
+ return;
+ }
+
+ local.cancel();
+ }
+
+ /**
* Compact the differences between a {@code before} and a {@code after}
* on top of the {@code before} state.
*
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/CompactionStrategy.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/CompactionStrategy.java (revision 1706358)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/CompactionStrategy.java (working copy)
@@ -293,4 +293,8 @@
this.offlineCompaction = offlineCompaction;
}
+ public boolean isDiskSpaceSufficient(long repositoryDiskSpace, long availableDiskSpace) {
+ return true;
+ }
+
}
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java (revision 1706358)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java (working copy)
@@ -149,6 +149,13 @@
*/
private final BackgroundThread compactionThread;
+ /**
+ * This background thread checks the available disk space, and compares it
+ * with the size of the repository. If the available disk space is not
+ * enough for compaction to run, the compaction thread is interrupted.
+ */
+ private final BackgroundThread diskSpaceCheckThread;
+
private CompactionStrategy compactionStrategy = NO_COMPACTION;
/**
@@ -174,6 +181,12 @@
private final GCMonitor gcMonitor;
/**
+ * The {@code Compactor} currently performing a compaction operation. It may
+ * be {@code null} and change over time.
+ */
+ private volatile Compactor compactor;
+
+ /**
* Create a new instance of a {@link Builder} for a file store.
* @param directory directory where the tar files are stored
* @return a new {@link Builder} instance.
@@ -457,9 +470,19 @@
maybeCompact(true);
}
});
+
+ this.diskSpaceCheckThread = new BackgroundThread("TarMK disk space check thread [" + directory + "]", 30 * 1000, new Runnable() {
+
+ @Override
+ public void run() {
+ checkDiskSpace();
+ }
+
+ });
} else {
this.flushThread = null;
this.compactionThread = null;
+ this.diskSpaceCheckThread = null;
}
if (readonly) {
@@ -492,6 +515,21 @@
return false;
}
+ long repositoryDiskSpace = size();
+ long avaiableDiskSpace = directory.getFreeSpace();
+
+ if (!compactionStrategy.isDiskSpaceSufficient(repositoryDiskSpace, avaiableDiskSpace)) {
+ gcMonitor.skipped("Not enough disk space available ({}), if compared with current repository size ({}), skipping compaction",
+ humanReadableByteCount(avaiableDiskSpace),
+ humanReadableByteCount(repositoryDiskSpace));
+
+ if (cleanup) {
+ cleanupNeeded.set(true);
+ }
+
+ return false;
+ }
+
Stopwatch watch = Stopwatch.createStarted();
compactionStrategy.setCompactionStart(System.currentTimeMillis());
boolean compacted = false;
@@ -777,7 +815,7 @@
gcMonitor.info("TarMK compaction running, strategy={}", compactionStrategy);
long start = System.currentTimeMillis();
- Compactor compactor = new Compactor(this, compactionStrategy);
+ compactor = new Compactor(this, compactionStrategy);
SegmentNodeState before = getHead();
long existing = before.getChildNode(SegmentNodeStore.CHECKPOINTS)
.getChildNodeCount(Long.MAX_VALUE);
@@ -789,6 +827,11 @@
SegmentNodeState after = compactor.compact(EMPTY_NODE, before, EMPTY_NODE);
+ if (compactor.wasCanceled()) {
+ gcMonitor.warn("TarMK compaction was canceled.");
+ return;
+ }
+
Callable setHead = new SetHead(before, after, compactor);
try {
int cycles = 0;
@@ -802,6 +845,12 @@
"Compacting these commits. Cycle {}", cycles);
SegmentNodeState head = getHead();
after = compactor.compact(before, head, after);
+
+ if (compactor.wasCanceled()) {
+ gcMonitor.warn("TarMK compaction was canceled.");
+ break;
+ }
+
before = head;
setHead = new SetHead(head, after, compactor);
}
@@ -875,6 +924,7 @@
// threads before acquiring the synchronization lock
closeAndLogOnFail(compactionThread);
closeAndLogOnFail(flushThread);
+ closeAndLogOnFail(diskSpaceCheckThread);
synchronized (this) {
try {
flush();
@@ -1093,6 +1143,27 @@
persistedHead.set(id);
}
+ private void checkDiskSpace() {
+ long repositoryDiskSpace = size();
+ long availableDiskSpace = directory.getFreeSpace();
+
+ if (compactionStrategy.isDiskSpaceSufficient(repositoryDiskSpace, availableDiskSpace)) {
+ return;
+ }
+
+ log.warn("Available disk space ({}) too low, if compared to current repository size ({})",
+ humanReadableByteCount(availableDiskSpace),
+ humanReadableByteCount(repositoryDiskSpace));
+
+ Compactor local = compactor;
+
+ if (local == null) {
+ return;
+ }
+
+ local.cancel();
+ }
+
/**
* A read only {@link FileStore} implementation that supports
* going back to old revisions.
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/package-info.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/package-info.java (revision 1706358)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/package-info.java (working copy)
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-@Version("5.0.0")
+@Version("5.1.0")
@Export(optional = "provide:=true")
package org.apache.jackrabbit.oak.plugins.segment;
Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CancelableDiffTest.java
===================================================================
--- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CancelableDiffTest.java (revision 0)
+++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CancelableDiffTest.java (working copy)
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment;
+
+import org.apache.jackrabbit.oak.api.PropertyState;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.spi.state.NodeStateDiff;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+public class CancelableDiffTest {
+
+ @Test
+ public void testPropertyAddedInterruptible() throws Throwable {
+ PropertyState after = mock(PropertyState.class);
+
+ NodeStateDiff wrapped = mock(NodeStateDiff.class);
+ doReturn(true).when(wrapped).propertyAdded(after);
+
+ CancelableDiff diff = new CancelableDiff(wrapped);
+
+ assertTrue(diff.propertyAdded(after));
+ assertFalse(diff.wasCanceled());
+
+ diff.cancel();
+
+ assertFalse(diff.propertyAdded(after));
+ assertTrue(diff.wasCanceled());
+ }
+
+ @Test
+ public void testPropertyChangedInterruptible() throws Throwable {
+ PropertyState before = mock(PropertyState.class);
+ PropertyState after = mock(PropertyState.class);
+
+ NodeStateDiff wrapped = mock(NodeStateDiff.class);
+ doReturn(true).when(wrapped).propertyChanged(before, after);
+
+ CancelableDiff diff = new CancelableDiff(wrapped);
+
+ assertTrue(diff.propertyChanged(before, after));
+ assertFalse(diff.wasCanceled());
+
+ diff.cancel();
+
+ assertFalse(diff.propertyChanged(before, after));
+ assertTrue(diff.wasCanceled());
+ }
+
+ @Test
+ public void testPropertyDeletedInterruptible() throws Throwable {
+ PropertyState before = mock(PropertyState.class);
+
+ NodeStateDiff wrapped = mock(NodeStateDiff.class);
+ doReturn(true).when(wrapped).propertyDeleted(before);
+
+ CancelableDiff diff = new CancelableDiff(wrapped);
+
+ assertTrue(diff.propertyDeleted(before));
+ assertFalse(diff.wasCanceled());
+
+ diff.cancel();
+
+ assertFalse(diff.propertyDeleted(before));
+ assertTrue(diff.wasCanceled());
+ }
+
+ @Test
+ public void testChildNodeAddedInterruptible() throws Throwable {
+ NodeState after = mock(NodeState.class);
+
+ NodeStateDiff wrapped = mock(NodeStateDiff.class);
+ doReturn(true).when(wrapped).childNodeAdded("name", after);
+
+ CancelableDiff diff = new CancelableDiff(wrapped);
+
+ assertTrue(diff.childNodeAdded("name", after));
+ assertFalse(diff.wasCanceled());
+
+ diff.cancel();
+
+ assertFalse(diff.childNodeAdded("name", after));
+ assertTrue(diff.wasCanceled());
+ }
+
+ @Test
+ public void testChildNodeChangedInterruptible() throws Throwable {
+ NodeState before = mock(NodeState.class);
+ NodeState after = mock(NodeState.class);
+
+ NodeStateDiff wrapped = mock(NodeStateDiff.class);
+ doReturn(true).when(wrapped).childNodeChanged("name", before, after);
+
+ CancelableDiff diff = new CancelableDiff(wrapped);
+
+ assertTrue(diff.childNodeChanged("name", before, after));
+ assertFalse(diff.wasCanceled());
+
+ diff.cancel();
+
+ assertFalse(diff.childNodeChanged("name", before, after));
+ assertTrue(diff.wasCanceled());
+ }
+
+ @Test
+ public void testChildNodeDeletedInterruptible() throws Throwable {
+ NodeState before = mock(NodeState.class);
+
+ NodeStateDiff wrapped = mock(NodeStateDiff.class);
+ doReturn(true).when(wrapped).childNodeDeleted("name", before);
+
+ CancelableDiff diff = new CancelableDiff(wrapped);
+
+ assertTrue(diff.childNodeDeleted("name", before));
+ assertFalse(diff.wasCanceled());
+
+ diff.cancel();
+
+ assertFalse(diff.childNodeDeleted("name", before));
+ assertTrue(diff.wasCanceled());
+ }
+
+}
Property changes on: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CancelableDiffTest.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactorTest.java
===================================================================
--- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactorTest.java (revision 1706358)
+++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactorTest.java (working copy)
@@ -17,7 +17,6 @@
package org.apache.jackrabbit.oak.plugins.segment;
import junit.framework.Assert;
-
import org.apache.jackrabbit.oak.Oak;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.plugins.segment.memory.MemoryStore;
@@ -27,35 +26,105 @@
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
public class CompactorTest {
+ private SegmentStore segmentStore;
+
+ @Before
+ public void openSegmentStore() {
+ segmentStore = new MemoryStore();
+ }
+
+ @After
+ public void closeSegmentStore() {
+ segmentStore.close();
+ }
+
@Test
public void testCompactor() throws Exception {
- MemoryStore source = new MemoryStore();
- try {
- NodeStore store = new SegmentNodeStore(source);
- init(store);
+ NodeStore store = new SegmentNodeStore(segmentStore);
+ init(store);
- Compactor compactor = new Compactor(source.getTracker().getWriter());
- addTestContent(store, 0);
+ Compactor compactor = new Compactor(segmentStore.getTracker().getWriter());
+ addTestContent(store, 0);
- NodeState initial = store.getRoot();
- SegmentNodeState after = compactor
- .compact(initial, store.getRoot(), initial);
- Assert.assertEquals(store.getRoot(), after);
+ NodeState initial = store.getRoot();
+ SegmentNodeState after = compactor
+ .compact(initial, store.getRoot(), initial);
+ Assert.assertEquals(store.getRoot(), after);
- addTestContent(store, 1);
- after = compactor.compact(initial, store.getRoot(), initial);
- Assert.assertEquals(store.getRoot(), after);
+ addTestContent(store, 1);
+ after = compactor.compact(initial, store.getRoot(), initial);
+ Assert.assertEquals(store.getRoot(), after);
+ }
- } finally {
- source.close();
- }
+ @Test
+ public void testCancel() throws Throwable {
+ NodeStore store = SegmentNodeStore.newSegmentNodeStore(segmentStore).create();
+ // Wait for the Compactor to set itself up, and cancel its work before
+ // the CancelableDiff has a chance to execute. The early cancellation is
+ // also the reason why the returned SegmentNodeState doesn't have the
+ // child named "b".
+
+ final CountDownLatch onBeforeProcess = new CountDownLatch(1);
+
+ final Compactor compactor = new Compactor(segmentStore.getTracker().getWriter()) {
+
+ @Override
+ protected void onBeforeProcess() {
+ onBeforeProcess.countDown();
+ }
+
+ };
+
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+
+ executor.submit(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ onBeforeProcess.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+
+ compactor.cancel();
+ }
+
+ });
+
+ SegmentNodeState sns = compactor.compact(store.getRoot(), addChild(store.getRoot(), "b"), store.getRoot());
+
+ // If the interrupted flag of the current thread is set, compaction
+ // should stop as soon as possible, and the compactor should signal an
+ // external interruption. The returned SegmentNodeState may not contain
+ // every compacted change.
+
+ assertTrue(compactor.wasCanceled());
+ assertFalse(sns.hasChildNode("b"));
+
+ executor.shutdown();
}
+ private NodeState addChild(NodeState current, String name) {
+ NodeBuilder builder = current.builder();
+ builder.child(name);
+ return builder.getNodeState();
+ }
+
private static void init(NodeStore store) {
new Oak(store).with(new OpenSecurityProvider())
.createContentRepository();