Index: oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/RecordTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/RecordTest.java (date 1477551936000) +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/RecordTest.java (date 1477561121000) @@ -434,14 +434,7 @@ @Test public void testCancel() throws IOException { NodeBuilder builder = EMPTY_NODE.builder(); - SegmentBufferWriter bufferWriter = new SegmentBufferWriter( - store, - store.getTracker().getSegmentCounter(), - store.getReader(), - "test", - 0 - ); - NodeState state = writer.writeNode(builder.getNodeState(), bufferWriter, Suppliers.ofInstance(true)); + NodeState state = writer.writeNode(builder.getNodeState(), Suppliers.ofInstance(true)); assertNull(state); } Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java (date 1477551936000) +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java (date 1477561121000) @@ -350,19 +350,13 @@ /** * Write a node state, unless cancelled using a dedicated write operation handler. - * The write operation handler is automatically {@link WriteOperationHandler#flush() flushed} - * once the node has been written successfully. * @param state node state to write - * @param writeOperationHandler the write operation handler through which all write calls - * induced by by this call are routed. * @param cancel supplier to signal cancellation of this write operation * @return segment node state equal to {@code state} or {@code null} if cancelled. * @throws IOException */ @CheckForNull - public SegmentNodeState writeNode(@Nonnull final NodeState state, - @Nonnull WriteOperationHandler writeOperationHandler, - @Nonnull Supplier cancel) + public SegmentNodeState writeNode(@Nonnull final NodeState state, @Nonnull Supplier cancel) throws IOException { try { RecordId nodeId = writeOperationHandler.execute(new SegmentWriteOperation(cancel) { @@ -371,7 +365,6 @@ return with(writer).writeNode(state); } }); - writeOperationHandler.flush(); return new SegmentNodeState(reader, this, nodeId); } catch (SegmentWriteOperation.CancelledWriteException ignore) { return null; Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java (date 1477551936000) +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java (date 1477561121000) @@ -75,15 +75,13 @@ import org.apache.jackrabbit.oak.segment.Compactor; import org.apache.jackrabbit.oak.segment.RecordId; import org.apache.jackrabbit.oak.segment.Segment; -import org.apache.jackrabbit.oak.segment.SegmentBufferWriter; import org.apache.jackrabbit.oak.segment.SegmentId; import org.apache.jackrabbit.oak.segment.SegmentIdTable; import org.apache.jackrabbit.oak.segment.SegmentNodeState; import org.apache.jackrabbit.oak.segment.SegmentNotFoundException; import org.apache.jackrabbit.oak.segment.SegmentWriter; -import org.apache.jackrabbit.oak.segment.WriterCacheManager.Default; +import org.apache.jackrabbit.oak.segment.WriterCacheManager; import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions; -import org.apache.jackrabbit.oak.spi.blob.BlobStore; import org.apache.jackrabbit.oak.spi.state.NodeBuilder; import org.apache.jackrabbit.oak.spi.state.NodeState; import org.slf4j.Logger; @@ -172,7 +170,8 @@ .with(builder.getCacheManager()) .build(this); this.maxFileSize = builder.getMaxFileSize() * MB; - this.garbageCollector = new GarbageCollector(builder.getGcOptions(), builder.getGcListener(), new GCJournal(directory)); + this.garbageCollector = new GarbageCollector( + builder.getGcOptions(), builder.getGcListener(), new GCJournal(directory), builder.getCacheManager()); Map> map = collectFiles(directory); @@ -709,14 +708,20 @@ @Nonnull private final GCJournal gcJournal; + @Nonnull + private final WriterCacheManager cacheManager; + private volatile boolean cancelled; - GarbageCollector(@Nonnull SegmentGCOptions gcOptions, - @Nonnull GCListener gcListener, - @Nonnull GCJournal gcJournal) { + GarbageCollector( + @Nonnull SegmentGCOptions gcOptions, + @Nonnull GCListener gcListener, + @Nonnull GCJournal gcJournal, + @Nonnull WriterCacheManager cacheManager) { this.gcOptions = gcOptions; this.gcListener = gcListener; this.gcJournal = gcJournal; + this.cacheManager = cacheManager; } synchronized void run() throws IOException { @@ -837,11 +842,15 @@ gcListener.info("TarMK GC #{}: compaction started, gc options={}", GC_COUNT, gcOptions); SegmentNodeState before = getHead(); - final int newGeneration = getGcGeneration() + 1; - SegmentBufferWriter bufferWriter = new SegmentBufferWriter( - FileStore.this, tracker.getSegmentCounter(), segmentReader, "c", newGeneration); Supplier cancel = new CancelCompactionSupplier(FileStore.this); - SegmentNodeState after = compact(bufferWriter, before, cancel); + final int newGeneration = getGcGeneration() + 1; + SegmentWriter writer = segmentWriterBuilder("c") + .with(cacheManager) + .withGeneration(newGeneration) + .withoutWriterPool() + .build(FileStore.this); + + SegmentNodeState after = compact(before, writer, cancel); if (after == null) { gcListener.info("TarMK GC #{}: compaction cancelled: {}.", GC_COUNT, cancel); return 0; @@ -862,7 +871,7 @@ "Compacting these commits. Cycle {} of {}", GC_COUNT, cycles, gcOptions.getRetryCount()); SegmentNodeState head = getHead(); - after = compact(bufferWriter, head, cancel); + after = compact(head, writer, cancel); if (after == null) { gcListener.info("TarMK GC #{}: compaction cancelled: {}.", GC_COUNT, cancel); return 0; @@ -881,7 +890,7 @@ gcListener.info("TarMK GC #{}: trying to force compact remaining commits for {} seconds", GC_COUNT, forceTimeout); cycles++; - success = forceCompact(bufferWriter, or(cancel, timeOut(forceTimeout, SECONDS))); + success = forceCompact(writer, or(cancel, timeOut(forceTimeout, SECONDS))); if (!success) { if (cancel.get()) { gcListener.warn("TarMK GC #{}: compaction failed to force compact remaining commits. " + @@ -895,6 +904,7 @@ } if (success) { + writer.flush(); gcListener.compactionSucceeded(newGeneration); gcListener.info("TarMK GC #{}: compaction succeeded in {} ({} ms), after {} cycles", GC_COUNT, watch, watch.elapsed(MILLISECONDS), cycles); @@ -947,20 +957,17 @@ } } - private SegmentNodeState compact(SegmentBufferWriter bufferWriter, NodeState head, - Supplier cancel) + private SegmentNodeState compact(NodeState head, SegmentWriter writer, Supplier cancel) throws IOException { if (gcOptions.isOffline()) { - BlobStore blobStore = getBlobStore(); - SegmentWriter writer = new SegmentWriter(FileStore.this, segmentReader, blobStore, new Default(), bufferWriter); - return new Compactor(segmentReader, writer, blobStore, cancel, gcOptions) + return new Compactor(segmentReader, writer, getBlobStore(), cancel, gcOptions) .compact(EMPTY_NODE, head, EMPTY_NODE); } else { - return segmentWriter.writeNode(head, bufferWriter, cancel); + return writer.writeNode(head, cancel); } } - private boolean forceCompact(@Nonnull final SegmentBufferWriter bufferWriter, + private boolean forceCompact(@Nonnull final SegmentWriter writer, @Nonnull final Supplier cancel) throws InterruptedException { return revisions. @@ -970,8 +977,8 @@ public RecordId apply(RecordId base) { try { long t0 = currentTimeMillis(); - SegmentNodeState after = compact(bufferWriter, - segmentReader.readNode(base), cancel); + SegmentNodeState after = compact( + segmentReader.readNode(base), writer, cancel); if (after == null) { gcListener.info("TarMK GC #{}: compaction cancelled after {} seconds", GC_COUNT, (currentTimeMillis() - t0) / 1000);