Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java (revision 1706189) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java (working copy) @@ -16,18 +16,6 @@ */ package org.apache.jackrabbit.oak.plugins.segment; -import static com.google.common.collect.Lists.newArrayList; -import static com.google.common.collect.Maps.newHashMap; -import static org.apache.jackrabbit.oak.api.Type.BINARIES; -import static org.apache.jackrabbit.oak.api.Type.BINARY; -import static org.apache.jackrabbit.oak.commons.PathUtils.concat; - -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - import com.google.common.base.Predicate; import com.google.common.base.Predicates; import com.google.common.hash.Hashing; @@ -47,6 +35,18 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static com.google.common.collect.Lists.newArrayList; +import static com.google.common.collect.Maps.newHashMap; +import static org.apache.jackrabbit.oak.api.Type.BINARIES; +import static org.apache.jackrabbit.oak.api.Type.BINARY; +import static org.apache.jackrabbit.oak.commons.PathUtils.concat; + /** * Tool for compacting segments. */ @@ -88,6 +88,12 @@ */ private final boolean cloneBinaries; + /** + * This flag is true if the thread running the compaction process received + * an interrupt. + */ + private boolean interrupted; + public Compactor(SegmentWriter writer) { this.writer = writer; this.map = new InMemoryCompactionMap(writer.getTracker()); @@ -109,11 +115,27 @@ protected SegmentNodeBuilder process(NodeState before, NodeState after, NodeState onto) { SegmentNodeBuilder builder = new SegmentNodeBuilder(writer.writeNode(onto), writer); - after.compareAgainstBaseState(before, new CompactDiff(builder)); + InterruptibleDiff diff = new InterruptibleDiff(new CompactDiff(builder)); + interrupted = false; + after.compareAgainstBaseState(before, diff); + interrupted = diff.wasInterrupted(); return builder; } /** + * If {@code true}, the compaction process was interrupted. In this case, + * the {@code SegmentNodeState} returned by {@link #compact(NodeState, + * NodeState)} and {@link #compact(NodeState, NodeState, NodeState)} + * contains only the changes that were compacted before the compaction + * process was interrupted. + * + * @return {@code true} if the compaction process was interrupted. + */ + public boolean wasInterrupted() { + return interrupted; + } + + /** * Compact the differences between a {@code before} and a {@code after} * on top of the {@code before} state. *

Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/InterruptibleDiff.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/InterruptibleDiff.java (revision 0) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/InterruptibleDiff.java (working copy) @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.segment; + +import org.apache.jackrabbit.oak.api.PropertyState; +import org.apache.jackrabbit.oak.spi.state.NodeState; +import org.apache.jackrabbit.oak.spi.state.NodeStateDiff; + +class InterruptibleDiff implements NodeStateDiff { + + private final NodeStateDiff delegate; + + private boolean interrupted = false; + + public InterruptibleDiff(NodeStateDiff delegate) { + this.delegate = delegate; + } + + @Override + public final boolean propertyAdded(PropertyState after) { + if (isInterrupted()) { + return interrupted(); + } else { + return delegate.propertyAdded(after); + } + } + + @Override + public final boolean propertyChanged(PropertyState before, PropertyState after) { + if (isInterrupted()) { + return interrupted(); + } else { + return delegate.propertyChanged(before, after); + } + } + + @Override + public final boolean propertyDeleted(PropertyState before) { + if (isInterrupted()) { + return interrupted(); + } else { + return delegate.propertyDeleted(before); + } + } + + @Override + public final boolean childNodeAdded(String name, NodeState after) { + if (isInterrupted()) { + return interrupted(); + } else { + return delegate.childNodeAdded(name, after); + } + } + + @Override + public final boolean childNodeChanged(String name, NodeState before, NodeState after) { + if (isInterrupted()) { + return interrupted(); + } else { + return delegate.childNodeChanged(name, before, after); + } + } + + @Override + public final boolean childNodeDeleted(String name, NodeState before) { + if (isInterrupted()) { + return interrupted(); + } else { + return delegate.childNodeDeleted(name, before); + } + } + + public boolean wasInterrupted() { + return interrupted; + } + + private boolean isInterrupted() { + return Thread.currentThread().isInterrupted(); + } + + private boolean interrupted() { + interrupted = true; + return false; + } + +} Property changes on: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/InterruptibleDiff.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/CompactionStrategy.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/CompactionStrategy.java (revision 1706189) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/CompactionStrategy.java (working copy) @@ -293,4 +293,8 @@ this.offlineCompaction = offlineCompaction; } + public boolean isDiskSpaceSufficient(long repositoryDiskSpace, long availableDiskSpace) { + return true; + } + } Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/BackgroundThread.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/BackgroundThread.java (revision 1706189) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/BackgroundThread.java (working copy) @@ -16,13 +16,13 @@ */ package org.apache.jackrabbit.oak.plugins.segment.file; -import static java.lang.System.currentTimeMillis; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.Closeable; import java.util.Date; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import static java.lang.System.currentTimeMillis; /** * A small wrapper around the Thread class that periodically calls a runnable. @@ -62,13 +62,57 @@ @Override public void run() { - try { - while (waitUntilNextIteration()) { + boolean alive = true; + + boolean interrupted = false; + + while (alive) { + + // Wait until the next iteration occurs. The waiting can be + // interrupted. If this happens, we will restore the interrupted + // flag later for the Runnable to catch. + + while (true) { + try { + onBeforeWait(); + alive = waitUntilNextIteration(); + break; + } catch (InterruptedException e) { + onInterrupt(); + interrupted = true; + } + } + + // At this point, the thread was notified or the waiting time + // expired. Additionally, the thread might have been interrupted. + // Interrupts don't terminate the thread, but the interrupted status + // may be checked by the Runnable to promptly cancel its activities. + // Termination of this thread is possible by calling the close() + // method. + + if (alive) { setName(name + ", active since " + new Date() + ", previous max duration " + maxDuration + "ms"); long start = currentTimeMillis(); - super.run(); + + // If the thread was interrupted, restore the interrupted flag + // of the thread, so the Runnable can act accordingly. + + if (interrupted) { + Thread.currentThread().interrupt(); + } + + // Invoke the runnable and, afterwards, clear the interrupted + // flag of the thread, so we can restart from a clean slate + // when waiting for the next iteration. + + try { + super.run(); + } finally { + interrupted = Thread.interrupted(); + } + long duration = currentTimeMillis() - start; iterations++; @@ -80,9 +124,6 @@ + ", avg " + (sumDuration / iterations) + "ms" + ", max " + maxDuration + "ms"); } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - log.error(name + " interrupted", e); } } @@ -90,6 +131,14 @@ trigger(false); } + protected void onBeforeWait() { + // Callback for testing purposes. + } + + protected void onInterrupt() { + // Callback for testing purposes. + } + @Override public void close() { try { Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java (revision 1706189) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java (working copy) @@ -16,47 +16,8 @@ */ package org.apache.jackrabbit.oak.plugins.segment.file; -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; -import static com.google.common.collect.Lists.newArrayList; -import static com.google.common.collect.Lists.newArrayListWithCapacity; -import static com.google.common.collect.Lists.newLinkedList; -import static com.google.common.collect.Maps.newHashMap; -import static com.google.common.collect.Sets.newHashSet; -import static java.lang.String.format; -import static java.util.Collections.emptyMap; -import static java.util.Collections.singletonMap; -import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount; -import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE; -import static org.apache.jackrabbit.oak.plugins.segment.CompactionMap.sum; -import static org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.NO_COMPACTION; - -import java.io.Closeable; -import java.io.File; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.nio.ByteBuffer; -import java.nio.channels.FileLock; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import javax.annotation.Nonnull; - import com.google.common.base.Stopwatch; import com.google.common.collect.Maps; - import org.apache.jackrabbit.oak.api.Blob; import org.apache.jackrabbit.oak.plugins.blob.BlobStoreBlob; import org.apache.jackrabbit.oak.plugins.segment.CompactionMap; @@ -80,6 +41,43 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileLock; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.collect.Lists.newArrayList; +import static com.google.common.collect.Lists.newArrayListWithCapacity; +import static com.google.common.collect.Lists.newLinkedList; +import static com.google.common.collect.Maps.newHashMap; +import static com.google.common.collect.Sets.newHashSet; +import static java.lang.String.format; +import static java.util.Collections.emptyMap; +import static java.util.Collections.singletonMap; +import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount; +import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE; +import static org.apache.jackrabbit.oak.plugins.segment.CompactionMap.sum; +import static org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.NO_COMPACTION; + /** * The storage implementation for tar files. */ @@ -149,6 +147,13 @@ */ private final BackgroundThread compactionThread; + /** + * This background thread checks the available disk space, and compares it + * with the size of the repository. If the available disk space is not + * enough for compaction to run, the compaction thread is interrupted. + */ + private final BackgroundThread diskSpaceCheckThread; + private CompactionStrategy compactionStrategy = NO_COMPACTION; /** @@ -457,9 +462,19 @@ maybeCompact(true); } }); + + this.diskSpaceCheckThread = new BackgroundThread("TarMK disk space check thread [" + directory + "]", 30 * 1000, new Runnable() { + + @Override + public void run() { + checkDiskSpace(); + } + + }); } else { this.flushThread = null; this.compactionThread = null; + this.diskSpaceCheckThread = null; } if (readonly) { @@ -492,6 +507,21 @@ return false; } + long repositoryDiskSpace = size(); + long avaiableDiskSpace = directory.getFreeSpace(); + + if (!compactionStrategy.isDiskSpaceSufficient(repositoryDiskSpace, avaiableDiskSpace)) { + gcMonitor.skipped("Not enough disk space available ({}), if compared with current repository size ({}), skipping compaction", + humanReadableByteCount(avaiableDiskSpace), + humanReadableByteCount(repositoryDiskSpace)); + + if (cleanup) { + cleanupNeeded.set(true); + } + + return false; + } + Stopwatch watch = Stopwatch.createStarted(); compactionStrategy.setCompactionStart(System.currentTimeMillis()); boolean compacted = false; @@ -789,6 +819,11 @@ SegmentNodeState after = compactor.compact(EMPTY_NODE, before, EMPTY_NODE); + if (compactor.wasInterrupted()) { + gcMonitor.warn("TarMK compaction was interrupted, transient changes will be discarded at next cleanup"); + return; + } + Callable setHead = new SetHead(before, after, compactor); try { int cycles = 0; @@ -802,6 +837,12 @@ "Compacting these commits. Cycle {}", cycles); SegmentNodeState head = getHead(); after = compactor.compact(before, head, after); + + if (compactor.wasInterrupted()) { + gcMonitor.warn("TarMK compaction was interrupted, transient changes will be discarded at next cleanup"); + break; + } + before = head; setHead = new SetHead(head, after, compactor); } @@ -875,6 +916,7 @@ // threads before acquiring the synchronization lock closeAndLogOnFail(compactionThread); closeAndLogOnFail(flushThread); + closeAndLogOnFail(diskSpaceCheckThread); synchronized (this) { try { flush(); @@ -1093,6 +1135,21 @@ persistedHead.set(id); } + private void checkDiskSpace() { + long repositoryDiskSpace = size(); + long availableDiskSpace = directory.getFreeSpace(); + + if (compactionStrategy.isDiskSpaceSufficient(repositoryDiskSpace, availableDiskSpace)) { + return; + } + + log.warn("Available disk space ({}) too low, if compared to current repository size ({})", + humanReadableByteCount(availableDiskSpace), + humanReadableByteCount(repositoryDiskSpace)); + + compactionThread.interrupt(); + } + /** * A read only {@link FileStore} implementation that supports * going back to old revisions. Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/package-info.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/package-info.java (revision 1706189) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/package-info.java (working copy) @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -@Version("5.0.0") +@Version("5.1.0") @Export(optional = "provide:=true") package org.apache.jackrabbit.oak.plugins.segment; Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactorTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactorTest.java (revision 1706189) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactorTest.java (working copy) @@ -17,7 +17,6 @@ package org.apache.jackrabbit.oak.plugins.segment; import junit.framework.Assert; - import org.apache.jackrabbit.oak.Oak; import org.apache.jackrabbit.oak.api.CommitFailedException; import org.apache.jackrabbit.oak.plugins.segment.memory.MemoryStore; @@ -27,33 +26,84 @@ import org.apache.jackrabbit.oak.spi.state.NodeBuilder; import org.apache.jackrabbit.oak.spi.state.NodeState; import org.apache.jackrabbit.oak.spi.state.NodeStore; +import org.junit.After; +import org.junit.Before; import org.junit.Test; +import static org.apache.jackrabbit.oak.plugins.segment.TestUtils.runInNewThread; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + public class CompactorTest { + private SegmentStore segmentStore; + + @Before + public void openSegmentStore() { + segmentStore = new MemoryStore(); + } + + @After + public void closeSegmentStore() { + segmentStore.close(); + } + @Test public void testCompactor() throws Exception { - MemoryStore source = new MemoryStore(); - try { - NodeStore store = new SegmentNodeStore(source); - init(store); + NodeStore store = new SegmentNodeStore(segmentStore); + init(store); - Compactor compactor = new Compactor(source.getTracker().getWriter()); - addTestContent(store, 0); + Compactor compactor = new Compactor(segmentStore.getTracker().getWriter()); + addTestContent(store, 0); - NodeState initial = store.getRoot(); - SegmentNodeState after = compactor - .compact(initial, store.getRoot(), initial); - Assert.assertEquals(store.getRoot(), after); + NodeState initial = store.getRoot(); + SegmentNodeState after = compactor + .compact(initial, store.getRoot(), initial); + Assert.assertEquals(store.getRoot(), after); - addTestContent(store, 1); - after = compactor.compact(initial, store.getRoot(), initial); - Assert.assertEquals(store.getRoot(), after); + addTestContent(store, 1); + after = compactor.compact(initial, store.getRoot(), initial); + Assert.assertEquals(store.getRoot(), after); + } - } finally { - source.close(); - } + @Test + public void testInterrupt() throws Throwable { + runInNewThread(new Runnable() { + @Override + public void run() { + Compactor compactor = new Compactor(segmentStore.getTracker().getWriter()); + assertFalse(compactor.wasInterrupted()); + + NodeStore store = SegmentNodeStore.newSegmentNodeStore(segmentStore).create(); + + // If the interrupted flag of the current thread is not set, + // compaction should run until the end, and the compactor should + // not signal any external interruption. The returned + // SegmentNodeState contains every compacted change. + + SegmentNodeState a = compactor.compact(store.getRoot(), addChild(store.getRoot(), "a"), store.getRoot()); + assertFalse(compactor.wasInterrupted()); + assertTrue(a.hasChildNode("a")); + + // If the interrupted flag of the current thread is set, + // compaction should stop as soon as possible, and the compactor + // should signal an external interruption. The returned + // SegmentNodeState may not contain every compacted change. + + Thread.currentThread().interrupt(); + SegmentNodeState b = compactor.compact(store.getRoot(), addChild(store.getRoot(), "b"), store.getRoot()); + assertTrue(compactor.wasInterrupted()); + assertFalse(b.hasChildNode("b")); + } + + private NodeState addChild(NodeState current, String name) { + NodeBuilder builder = current.builder(); + builder.child(name); + return builder.getNodeState(); + } + + }); } private static void init(NodeStore store) { Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/InterruptibleDiffTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/InterruptibleDiffTest.java (revision 0) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/InterruptibleDiffTest.java (working copy) @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.segment; + +import org.apache.jackrabbit.oak.api.PropertyState; +import org.apache.jackrabbit.oak.spi.state.NodeState; +import org.apache.jackrabbit.oak.spi.state.NodeStateDiff; +import org.junit.Test; + +import static org.apache.jackrabbit.oak.plugins.segment.TestUtils.runInNewThread; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; + +public class InterruptibleDiffTest { + + @Test + public void testPropertyAddedInterruptible() throws Throwable { + runInNewThread(new Runnable() { + + @Override + public void run() { + PropertyState after = mock(PropertyState.class); + + NodeStateDiff wrapped = mock(NodeStateDiff.class); + doReturn(true).when(wrapped).propertyAdded(after); + + InterruptibleDiff diff = new InterruptibleDiff(wrapped); + + assertTrue(diff.propertyAdded(after)); + assertFalse(diff.wasInterrupted()); + + Thread.currentThread().interrupt(); + assertFalse(diff.propertyAdded(after)); + assertTrue(diff.wasInterrupted()); + } + + }); + } + + @Test + public void testPropertyChangedInterruptible() throws Throwable { + runInNewThread(new Runnable() { + + @Override + public void run() { + PropertyState before = mock(PropertyState.class); + PropertyState after = mock(PropertyState.class); + + NodeStateDiff wrapped = mock(NodeStateDiff.class); + doReturn(true).when(wrapped).propertyChanged(before, after); + + InterruptibleDiff diff = new InterruptibleDiff(wrapped); + + assertTrue(diff.propertyChanged(before, after)); + assertFalse(diff.wasInterrupted()); + + Thread.currentThread().interrupt(); + assertFalse(diff.propertyChanged(before, after)); + assertTrue(diff.wasInterrupted()); + } + + }); + } + + @Test + public void testPropertyDeletedInterruptible() throws Throwable { + runInNewThread(new Runnable() { + + @Override + public void run() { + PropertyState before = mock(PropertyState.class); + + NodeStateDiff wrapped = mock(NodeStateDiff.class); + doReturn(true).when(wrapped).propertyDeleted(before); + + InterruptibleDiff diff = new InterruptibleDiff(wrapped); + + assertTrue(diff.propertyDeleted(before)); + assertFalse(diff.wasInterrupted()); + + Thread.currentThread().interrupt(); + assertFalse(diff.propertyDeleted(before)); + assertTrue(diff.wasInterrupted()); + } + + }); + } + + @Test + public void testChildNodeAddedInterruptible() throws Throwable { + runInNewThread(new Runnable() { + + @Override + public void run() { + NodeState after = mock(NodeState.class); + + NodeStateDiff wrapped = mock(NodeStateDiff.class); + doReturn(true).when(wrapped).childNodeAdded("name", after); + + InterruptibleDiff diff = new InterruptibleDiff(wrapped); + + assertTrue(diff.childNodeAdded("name", after)); + assertFalse(diff.wasInterrupted()); + + Thread.currentThread().interrupt(); + assertFalse(diff.childNodeAdded("name", after)); + assertTrue(diff.wasInterrupted()); + } + + }); + } + + @Test + public void testChildNodeChangedInterruptible() throws Throwable { + runInNewThread(new Runnable() { + + @Override + public void run() { + NodeState before = mock(NodeState.class); + NodeState after = mock(NodeState.class); + + NodeStateDiff wrapped = mock(NodeStateDiff.class); + doReturn(true).when(wrapped).childNodeChanged("name", before, after); + + InterruptibleDiff diff = new InterruptibleDiff(wrapped); + + assertTrue(diff.childNodeChanged("name", before, after)); + assertFalse(diff.wasInterrupted()); + + Thread.currentThread().interrupt(); + assertFalse(diff.childNodeChanged("name", before, after)); + assertTrue(diff.wasInterrupted()); + } + + }); + } + + @Test + public void testChildNodeDeletedInterruptible() throws Throwable { + runInNewThread(new Runnable() { + + @Override + public void run() { + NodeState before = mock(NodeState.class); + + NodeStateDiff wrapped = mock(NodeStateDiff.class); + doReturn(true).when(wrapped).childNodeDeleted("name", before); + + InterruptibleDiff diff = new InterruptibleDiff(wrapped); + + assertTrue(diff.childNodeDeleted("name", before)); + assertFalse(diff.wasInterrupted()); + + Thread.currentThread().interrupt(); + assertFalse(diff.childNodeDeleted("name", before)); + assertTrue(diff.wasInterrupted()); + } + + }); + } + +} Property changes on: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/InterruptibleDiffTest.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/TestUtils.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/TestUtils.java (revision 1706189) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/TestUtils.java (working copy) @@ -69,4 +69,32 @@ return map; } + public static void runInNewThread(Runnable runnable) throws Throwable { + final Holder holder = new Holder(); + + Thread thread = new Thread(runnable); + + thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { + + @Override + public void uncaughtException(Thread t, Throwable e) { + holder.held = e; + } + + }); + + thread.start(); + thread.join(); + + if (holder.held != null) { + throw holder.held; + } + } + + private static class Holder { + + public T held; + + } + } Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/BackgroundThreadTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/BackgroundThreadTest.java (revision 0) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/BackgroundThreadTest.java (working copy) @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.segment.file; + +import org.junit.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class BackgroundThreadTest { + + @Test + public void testClose() { + BackgroundThread thread = new BackgroundThread("test", 10, new Runnable() { + + @Override + public void run() { + // Do nothing. + } + + }); + + assertTrue(thread.isAlive()); + thread.close(); + assertFalse(thread.isAlive()); + } + + @Test + public void testScheduledException() throws Exception { + final CountDownLatch executed = new CountDownLatch(1); + + BackgroundThread thread = new BackgroundThread("test", 10, new Runnable() { + + @Override + public void run() { + executed.countDown(); + } + + }); + + executed.await(); + thread.close(); + } + + @Test + public void testTriggeredExecution() throws Exception { + final CountDownLatch executed = new CountDownLatch(1); + + Runnable runnable = new Runnable() { + + @Override + public void run() { + executed.countDown(); + } + + }; + + final CountDownLatch beforeWaiting = new CountDownLatch(1); + + BackgroundThread thread = new BackgroundThread("test", -1, runnable) { + + @Override + protected void onBeforeWait() { + beforeWaiting.countDown(); + } + + }; + + beforeWaiting.await();; + thread.trigger(); + + executed.await(); + thread.close(); + } + + @Test + public void testInterruptScheduledExecution() throws Exception { + final CountDownLatch executed = new CountDownLatch(1); + + final AtomicBoolean interrupted = new AtomicBoolean(false); + + Runnable runnable = new Runnable() { + + @Override + public void run() { + interrupted.set(Thread.currentThread().isInterrupted()); + executed.countDown(); + } + + }; + + final CountDownLatch beforeWaiting = new CountDownLatch(1); + + BackgroundThread thread = new BackgroundThread("test", 10, runnable) { + + @Override + protected void onBeforeWait() { + boolean interrupted = false; + + while (true) { + try { + beforeWaiting.await(); + break; + } catch (InterruptedException e) { + interrupted = true; + } + } + + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + + }; + + thread.interrupt(); + beforeWaiting.countDown(); + + executed.await(); + assertTrue(interrupted.get()); + thread.close(); + } + + @Test + public void testInterruptTriggeredExecution() throws Exception { + final CountDownLatch executed = new CountDownLatch(1); + + final AtomicBoolean interrupted = new AtomicBoolean(false); + + Runnable runnable = new Runnable() { + + @Override + public void run() { + interrupted.set(Thread.currentThread().isInterrupted()); + executed.countDown(); + } + + }; + + final CountDownLatch onInterrupt = new CountDownLatch(1); + + BackgroundThread thread = new BackgroundThread("test", -1, runnable) { + + @Override + protected void onInterrupt() { + onInterrupt.countDown(); + } + + }; + + thread.interrupt(); + + onInterrupt.await(); + thread.trigger(); + + executed.await(); + assertTrue(interrupted.get()); + thread.close(); + } + +} Property changes on: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/BackgroundThreadTest.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property