Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CancelableDiff.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CancelableDiff.java (revision 0) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CancelableDiff.java (working copy) @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.segment; + +import org.apache.jackrabbit.oak.api.PropertyState; +import org.apache.jackrabbit.oak.spi.state.NodeState; +import org.apache.jackrabbit.oak.spi.state.NodeStateDiff; + +class CancelableDiff implements NodeStateDiff { + + private final NodeStateDiff delegate; + + private volatile boolean cancel = false; + + public CancelableDiff(NodeStateDiff delegate) { + this.delegate = delegate; + } + + @Override + public final boolean propertyAdded(PropertyState after) { + if (wasCanceled()) { + return false; + } else { + return delegate.propertyAdded(after); + } + } + + @Override + public final boolean propertyChanged(PropertyState before, PropertyState after) { + if (wasCanceled()) { + return false; + } else { + return delegate.propertyChanged(before, after); + } + } + + @Override + public final boolean propertyDeleted(PropertyState before) { + if (wasCanceled()) { + return false; + } else { + return delegate.propertyDeleted(before); + } + } + + @Override + public final boolean childNodeAdded(String name, NodeState after) { + if (wasCanceled()) { + return false; + } else { + return delegate.childNodeAdded(name, after); + } + } + + @Override + public final boolean childNodeChanged(String name, NodeState before, NodeState after) { + if (wasCanceled()) { + return false; + } else { + return delegate.childNodeChanged(name, before, after); + } + } + + @Override + public final boolean childNodeDeleted(String name, NodeState before) { + if (wasCanceled()) { + return false; + } else { + return delegate.childNodeDeleted(name, before); + } + } + + public boolean wasCanceled() { + return cancel; + } + + public void cancel() { + cancel = true; + } + +} Property changes on: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CancelableDiff.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java (revision 1706358) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java (working copy) @@ -88,6 +88,12 @@ */ private final boolean cloneBinaries; + /** + * A reference to the cancelable diff that is currently performing a + * compaction operation. + */ + private volatile CancelableDiff diff; + public Compactor(SegmentWriter writer) { this.writer = writer; this.map = new InMemoryCompactionMap(writer.getTracker()); @@ -109,11 +115,50 @@ protected SegmentNodeBuilder process(NodeState before, NodeState after, NodeState onto) { SegmentNodeBuilder builder = new SegmentNodeBuilder(writer.writeNode(onto), writer); - after.compareAgainstBaseState(before, new CompactDiff(builder)); + diff = new CancelableDiff(new CompactDiff(builder)); + onBeforeProcess(); + after.compareAgainstBaseState(before, diff); return builder; } + protected void onBeforeProcess() { + // Callback for testing purposes. + } + /** + * If {@code true}, the compaction process was canceled. In this case, the + * {@code SegmentNodeState} returned by {@link #compact(NodeState, + * NodeState)} and {@link #compact(NodeState, NodeState, NodeState)} + * contains only the changes that were compacted before the compaction + * process was canceled. + * + * @return {@code true} if the compaction process was canceled. + */ + public boolean wasCanceled() { + CancelableDiff local = diff; + + if (local == null) { + return false; + } + + return local.wasCanceled(); + } + + /** + * Cancel the currently running compaction operation. If compaction is not + * running yet, this method is a no-op. + */ + public void cancel() { + CancelableDiff local = diff; + + if (local == null) { + return; + } + + local.cancel(); + } + + /** * Compact the differences between a {@code before} and a {@code after} * on top of the {@code before} state. *

Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/CompactionStrategy.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/CompactionStrategy.java (revision 1706358) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/CompactionStrategy.java (working copy) @@ -293,4 +293,8 @@ this.offlineCompaction = offlineCompaction; } + public boolean isDiskSpaceSufficient(long repositoryDiskSpace, long availableDiskSpace) { + return true; + } + } Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java (revision 1706358) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java (working copy) @@ -149,6 +149,13 @@ */ private final BackgroundThread compactionThread; + /** + * This background thread checks the available disk space, and compares it + * with the size of the repository. If the available disk space is not + * enough for compaction to run, the compaction thread is interrupted. + */ + private final BackgroundThread diskSpaceCheckThread; + private CompactionStrategy compactionStrategy = NO_COMPACTION; /** @@ -174,6 +181,12 @@ private final GCMonitor gcMonitor; /** + * The {@code Compactor} currently performing a compaction operation. It may + * be {@code null} and change over time. + */ + private volatile Compactor compactor; + + /** * Create a new instance of a {@link Builder} for a file store. * @param directory directory where the tar files are stored * @return a new {@link Builder} instance. @@ -457,9 +470,19 @@ maybeCompact(true); } }); + + this.diskSpaceCheckThread = new BackgroundThread("TarMK disk space check thread [" + directory + "]", 30 * 1000, new Runnable() { + + @Override + public void run() { + checkDiskSpace(); + } + + }); } else { this.flushThread = null; this.compactionThread = null; + this.diskSpaceCheckThread = null; } if (readonly) { @@ -492,6 +515,21 @@ return false; } + long repositoryDiskSpace = size(); + long avaiableDiskSpace = directory.getFreeSpace(); + + if (!compactionStrategy.isDiskSpaceSufficient(repositoryDiskSpace, avaiableDiskSpace)) { + gcMonitor.skipped("Not enough disk space available ({}), if compared with current repository size ({}), skipping compaction", + humanReadableByteCount(avaiableDiskSpace), + humanReadableByteCount(repositoryDiskSpace)); + + if (cleanup) { + cleanupNeeded.set(true); + } + + return false; + } + Stopwatch watch = Stopwatch.createStarted(); compactionStrategy.setCompactionStart(System.currentTimeMillis()); boolean compacted = false; @@ -777,7 +815,7 @@ gcMonitor.info("TarMK compaction running, strategy={}", compactionStrategy); long start = System.currentTimeMillis(); - Compactor compactor = new Compactor(this, compactionStrategy); + compactor = new Compactor(this, compactionStrategy); SegmentNodeState before = getHead(); long existing = before.getChildNode(SegmentNodeStore.CHECKPOINTS) .getChildNodeCount(Long.MAX_VALUE); @@ -789,6 +827,11 @@ SegmentNodeState after = compactor.compact(EMPTY_NODE, before, EMPTY_NODE); + if (compactor.wasCanceled()) { + gcMonitor.warn("TarMK compaction was canceled."); + return; + } + Callable setHead = new SetHead(before, after, compactor); try { int cycles = 0; @@ -802,6 +845,12 @@ "Compacting these commits. Cycle {}", cycles); SegmentNodeState head = getHead(); after = compactor.compact(before, head, after); + + if (compactor.wasCanceled()) { + gcMonitor.warn("TarMK compaction was canceled."); + break; + } + before = head; setHead = new SetHead(head, after, compactor); } @@ -875,6 +924,7 @@ // threads before acquiring the synchronization lock closeAndLogOnFail(compactionThread); closeAndLogOnFail(flushThread); + closeAndLogOnFail(diskSpaceCheckThread); synchronized (this) { try { flush(); @@ -1093,6 +1143,27 @@ persistedHead.set(id); } + private void checkDiskSpace() { + long repositoryDiskSpace = size(); + long availableDiskSpace = directory.getFreeSpace(); + + if (compactionStrategy.isDiskSpaceSufficient(repositoryDiskSpace, availableDiskSpace)) { + return; + } + + log.warn("Available disk space ({}) too low, if compared to current repository size ({})", + humanReadableByteCount(availableDiskSpace), + humanReadableByteCount(repositoryDiskSpace)); + + Compactor local = compactor; + + if (local == null) { + return; + } + + local.cancel(); + } + /** * A read only {@link FileStore} implementation that supports * going back to old revisions. Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/package-info.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/package-info.java (revision 1706358) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/package-info.java (working copy) @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -@Version("5.0.0") +@Version("5.1.0") @Export(optional = "provide:=true") package org.apache.jackrabbit.oak.plugins.segment; Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CancelableDiffTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CancelableDiffTest.java (revision 0) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CancelableDiffTest.java (working copy) @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.segment; + +import org.apache.jackrabbit.oak.api.PropertyState; +import org.apache.jackrabbit.oak.spi.state.NodeState; +import org.apache.jackrabbit.oak.spi.state.NodeStateDiff; +import org.junit.Test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; + +public class CancelableDiffTest { + + @Test + public void testPropertyAddedInterruptible() throws Throwable { + PropertyState after = mock(PropertyState.class); + + NodeStateDiff wrapped = mock(NodeStateDiff.class); + doReturn(true).when(wrapped).propertyAdded(after); + + CancelableDiff diff = new CancelableDiff(wrapped); + + assertTrue(diff.propertyAdded(after)); + assertFalse(diff.wasCanceled()); + + diff.cancel(); + + assertFalse(diff.propertyAdded(after)); + assertTrue(diff.wasCanceled()); + } + + @Test + public void testPropertyChangedInterruptible() throws Throwable { + PropertyState before = mock(PropertyState.class); + PropertyState after = mock(PropertyState.class); + + NodeStateDiff wrapped = mock(NodeStateDiff.class); + doReturn(true).when(wrapped).propertyChanged(before, after); + + CancelableDiff diff = new CancelableDiff(wrapped); + + assertTrue(diff.propertyChanged(before, after)); + assertFalse(diff.wasCanceled()); + + diff.cancel(); + + assertFalse(diff.propertyChanged(before, after)); + assertTrue(diff.wasCanceled()); + } + + @Test + public void testPropertyDeletedInterruptible() throws Throwable { + PropertyState before = mock(PropertyState.class); + + NodeStateDiff wrapped = mock(NodeStateDiff.class); + doReturn(true).when(wrapped).propertyDeleted(before); + + CancelableDiff diff = new CancelableDiff(wrapped); + + assertTrue(diff.propertyDeleted(before)); + assertFalse(diff.wasCanceled()); + + diff.cancel(); + + assertFalse(diff.propertyDeleted(before)); + assertTrue(diff.wasCanceled()); + } + + @Test + public void testChildNodeAddedInterruptible() throws Throwable { + NodeState after = mock(NodeState.class); + + NodeStateDiff wrapped = mock(NodeStateDiff.class); + doReturn(true).when(wrapped).childNodeAdded("name", after); + + CancelableDiff diff = new CancelableDiff(wrapped); + + assertTrue(diff.childNodeAdded("name", after)); + assertFalse(diff.wasCanceled()); + + diff.cancel(); + + assertFalse(diff.childNodeAdded("name", after)); + assertTrue(diff.wasCanceled()); + } + + @Test + public void testChildNodeChangedInterruptible() throws Throwable { + NodeState before = mock(NodeState.class); + NodeState after = mock(NodeState.class); + + NodeStateDiff wrapped = mock(NodeStateDiff.class); + doReturn(true).when(wrapped).childNodeChanged("name", before, after); + + CancelableDiff diff = new CancelableDiff(wrapped); + + assertTrue(diff.childNodeChanged("name", before, after)); + assertFalse(diff.wasCanceled()); + + diff.cancel(); + + assertFalse(diff.childNodeChanged("name", before, after)); + assertTrue(diff.wasCanceled()); + } + + @Test + public void testChildNodeDeletedInterruptible() throws Throwable { + NodeState before = mock(NodeState.class); + + NodeStateDiff wrapped = mock(NodeStateDiff.class); + doReturn(true).when(wrapped).childNodeDeleted("name", before); + + CancelableDiff diff = new CancelableDiff(wrapped); + + assertTrue(diff.childNodeDeleted("name", before)); + assertFalse(diff.wasCanceled()); + + diff.cancel(); + + assertFalse(diff.childNodeDeleted("name", before)); + assertTrue(diff.wasCanceled()); + } + +} Property changes on: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CancelableDiffTest.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactorTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactorTest.java (revision 1706358) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactorTest.java (working copy) @@ -17,7 +17,6 @@ package org.apache.jackrabbit.oak.plugins.segment; import junit.framework.Assert; - import org.apache.jackrabbit.oak.Oak; import org.apache.jackrabbit.oak.api.CommitFailedException; import org.apache.jackrabbit.oak.plugins.segment.memory.MemoryStore; @@ -27,35 +26,105 @@ import org.apache.jackrabbit.oak.spi.state.NodeBuilder; import org.apache.jackrabbit.oak.spi.state.NodeState; import org.apache.jackrabbit.oak.spi.state.NodeStore; +import org.junit.After; +import org.junit.Before; import org.junit.Test; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + public class CompactorTest { + private SegmentStore segmentStore; + + @Before + public void openSegmentStore() { + segmentStore = new MemoryStore(); + } + + @After + public void closeSegmentStore() { + segmentStore.close(); + } + @Test public void testCompactor() throws Exception { - MemoryStore source = new MemoryStore(); - try { - NodeStore store = new SegmentNodeStore(source); - init(store); + NodeStore store = new SegmentNodeStore(segmentStore); + init(store); - Compactor compactor = new Compactor(source.getTracker().getWriter()); - addTestContent(store, 0); + Compactor compactor = new Compactor(segmentStore.getTracker().getWriter()); + addTestContent(store, 0); - NodeState initial = store.getRoot(); - SegmentNodeState after = compactor - .compact(initial, store.getRoot(), initial); - Assert.assertEquals(store.getRoot(), after); + NodeState initial = store.getRoot(); + SegmentNodeState after = compactor + .compact(initial, store.getRoot(), initial); + Assert.assertEquals(store.getRoot(), after); - addTestContent(store, 1); - after = compactor.compact(initial, store.getRoot(), initial); - Assert.assertEquals(store.getRoot(), after); + addTestContent(store, 1); + after = compactor.compact(initial, store.getRoot(), initial); + Assert.assertEquals(store.getRoot(), after); + } - } finally { - source.close(); - } + @Test + public void testCancel() throws Throwable { + NodeStore store = SegmentNodeStore.newSegmentNodeStore(segmentStore).create(); + // Wait for the Compactor to set itself up, and cancel its work before + // the CancelableDiff has a chance to execute. The early cancellation is + // also the reason why the returned SegmentNodeState doesn't have the + // child named "b". + + final CountDownLatch onBeforeProcess = new CountDownLatch(1); + + final Compactor compactor = new Compactor(segmentStore.getTracker().getWriter()) { + + @Override + protected void onBeforeProcess() { + onBeforeProcess.countDown(); + } + + }; + + ExecutorService executor = Executors.newSingleThreadExecutor(); + + executor.submit(new Runnable() { + + @Override + public void run() { + try { + onBeforeProcess.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + compactor.cancel(); + } + + }); + + SegmentNodeState sns = compactor.compact(store.getRoot(), addChild(store.getRoot(), "b"), store.getRoot()); + + // If the interrupted flag of the current thread is set, compaction + // should stop as soon as possible, and the compactor should signal an + // external interruption. The returned SegmentNodeState may not contain + // every compacted change. + + assertTrue(compactor.wasCanceled()); + assertFalse(sns.hasChildNode("b")); + + executor.shutdown(); } + private NodeState addChild(NodeState current, String name) { + NodeBuilder builder = current.builder(); + builder.child(name); + return builder.getNodeState(); + } + private static void init(NodeStore store) { new Oak(store).with(new OpenSecurityProvider()) .createContentRepository();