Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java (date 1414053809000) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java (date 1413989611000) @@ -82,7 +82,7 @@ * Only a single local commit is allowed at a time. When such * a commit is in progress, no external updates will be seen. */ - private final Semaphore commitSemaphore = new Semaphore(1); + public final Semaphore commitSemaphore = new Semaphore(1); private long maximumBackoff = MILLISECONDS.convert(10, SECONDS); Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java (date 1414053809000) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java (date 1413989611000) @@ -44,6 +44,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Matcher; @@ -487,12 +488,16 @@ humanReadableByteCount(initialSize - finalSize)); } + public void compact() { + compact(null); + } + /** * Copy every referenced record in data (non-bulk) segments. Bulk segments * are fully kept (they are only removed in cleanup, if there is no * reference to them). */ - public void compact() { + public void compact(Semaphore semaphore) { long start = System.nanoTime(); log.info("TarMK compaction running"); @@ -510,25 +515,60 @@ SegmentNodeState after = compactor.compact(EMPTY_NODE, before); writer.flush(); - while (!setHead(before, after)) { + while (!setHead(before, after, semaphore, compactor)) { // Some other concurrent changes have been made. // Rebase (and compact) those changes on top of the // compacted state before retrying to set the head. SegmentNodeState head = getHead(); - after = compactor.compact(before, head); + after = compactor.compact(after, head); before = head; writer.flush(); } + + log.info("TarMK compaction completed in {}ms", MILLISECONDS + .convert(System.nanoTime() - start, NANOSECONDS)); + } + + private boolean setHead(SegmentNodeState before, SegmentNodeState after, Semaphore semaphore, Compactor compactor) { + if (tryAcquire(semaphore)) { + // Need to acquire SegmentNodeStore.commitSemaphore to prevent doing setHead + // while a commit is in progress. If we don't do this, SegmentNodeStore.Commit.prepare() + // might do a rebase using before and after states from *before* compaction, which + // leads to a rebased state referring to un-compacted states. + try { + boolean success = setHead(before, after); + if (success) { - tracker.setCompactionMap(compactor.getCompactionMap()); + tracker.setCompactionMap(compactor.getCompactionMap()); - // Drop the SegmentWriter caches and flush any existing state - // in an attempt to prevent new references to old pre-compacted - // content. TODO: There should be a cleaner way to do this. + // Drop the SegmentWriter caches and flush any existing state + // in an attempt to prevent new references to old pre-compacted + // content. TODO: There should be a cleaner way to do this. + // We need to do this inside the commitSemaphore. Doing it before + // might result will result in the new segment still referring to the + // un-compacted one. Doing it afterwards opens a race where a new head, + // which refers to the compacted head, might end up in the old, + // un-compacted segment. Both cases cause the un-compacted state to + // remain references from the compacted one. - tracker.getWriter().dropCache(); - tracker.getWriter().flush(); + tracker.getWriter().dropCache(); + tracker.getWriter().flush(); + } + return success; + } finally { + release(semaphore); + } + } else { + return false; + } + } - log.info("TarMK compaction completed in {}ms", MILLISECONDS - .convert(System.nanoTime() - start, NANOSECONDS)); + private static boolean tryAcquire(Semaphore semaphore) { + return semaphore == null || semaphore.tryAcquire(); + } + + private static void release(Semaphore semaphore) { + if (semaphore != null) { + semaphore.release(); + } } public synchronized Iterable getSegmentIds() { Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/CompactionAndCleanupTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/CompactionAndCleanupTest.java (date 1414053809000) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/CompactionAndCleanupTest.java (date 1413989611000) @@ -193,7 +193,6 @@ } } - @Ignore("OAK-2192") // FIXME OAK-2192 @Test public void testMixedSegments() throws Exception { FileStore store = new FileStore(directory, 2, false); @@ -228,7 +227,7 @@ }); t.start(); - store.compact(); + store.compact(nodeStore.commitSemaphore); run.set(false); t.join();