Index: oak-run/src/main/java/org/apache/jackrabbit/oak/run/SegmentTarUtils.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-run/src/main/java/org/apache/jackrabbit/oak/run/SegmentTarUtils.java (date 1464767331000) +++ oak-run/src/main/java/org/apache/jackrabbit/oak/run/SegmentTarUtils.java (date 1464767926000) @@ -179,8 +179,8 @@ static void compact(File directory, boolean force) throws IOException { FileStore store = openFileStore(directory.getAbsolutePath(), force); + store.getGcOptions().setOffline(true); try { - boolean persistCM = Boolean.getBoolean("tar.PersistCompactionMap"); System.out.println("Compacting " + directory); System.out.println(" before " + Arrays.toString(directory.list())); long sizeBefore = FileUtils.sizeOfDirectory(directory); @@ -195,6 +195,7 @@ System.out.println(" -> cleaning up"); store = openFileStore(directory.getAbsolutePath(), false); + store.getGcOptions().setOffline(true); try { for (File file : store.cleanup()) { if (!file.exists() || file.delete()) { Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentGCOptions.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentGCOptions.java (date 1464767331000) +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentGCOptions.java (date 1464767926000) @@ -81,7 +81,13 @@ private int lockWaitTime = LOCK_WAIT_TIME_DEFAULT; private int retainedGenerations = RETAINED_GENERATIONS_DEFAULT; - + + /** + * Flag that allows turning on an optimized version of the compaction + * process in the case of offline compaction + */ + private boolean offline = false; + public SegmentGCOptions(boolean paused, int memoryThreshold, int gainThreshold, int retryCount, boolean forceAfterFail, int lockWaitTime) { this.paused = paused; @@ -245,7 +251,8 @@ ", retryCount=" + retryCount + ", forceAfterFail=" + forceAfterFail + ", lockWaitTime=" + lockWaitTime + - ", retainedGenerations=" + retainedGenerations + '}'; + ", retainedGenerations=" + retainedGenerations + + ", offline=" + offline + "}"; } /** @@ -262,4 +269,11 @@ return availableDiskSpace > 0.25 * repositoryDiskSpace; } + public boolean isOffline() { + return offline; + } + + public void setOffline(boolean offline) { + this.offline = offline; + } } Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java (date 1464767331000) +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java (date 1464767926000) @@ -78,6 +78,7 @@ import org.apache.jackrabbit.oak.cache.CacheStats; import org.apache.jackrabbit.oak.plugins.blob.ReferenceCollector; import org.apache.jackrabbit.oak.segment.CachingSegmentReader; +import org.apache.jackrabbit.oak.segment.Compactor; import org.apache.jackrabbit.oak.segment.RecordId; import org.apache.jackrabbit.oak.segment.Segment; import org.apache.jackrabbit.oak.segment.SegmentBufferWriter; @@ -1133,11 +1134,17 @@ } } - private SegmentNodeState compact(SegmentBufferWriter bufferWriter, NodeState node, - Supplier cancel) - throws IOException { - return segmentWriter.writeNode(node, bufferWriter, cancel); + private SegmentNodeState compact(SegmentBufferWriter bufferWriter, + NodeState head, Supplier cancel) throws IOException { + if (gcOptions.isOffline()) { + // Capital C to indicate offline compaction + SegmentWriter writer = new SegmentWriter(this, segmentReader, blobStore, tracker, bufferWriter); + return new Compactor(segmentReader, writer, blobStore, cancel). + compact(EMPTY_NODE, head, EMPTY_NODE); + } else { + return segmentWriter.writeNode(head, bufferWriter, cancel); - } + } + } private boolean forceCompact(@Nonnull final SegmentBufferWriter bufferWriter, @Nonnull final Supplier cancel) @@ -1624,5 +1631,9 @@ public void cleaned(long reclaimedSize, long currentSize) { delegatee.cleaned(reclaimedSize, currentSize); } + } + + public SegmentGCOptions getGcOptions() { + return gcOptions; } } Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Compactor.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Compactor.java (date 1464767926000) +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Compactor.java (date 1464767926000) @@ -0,0 +1,486 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment; + +import static com.google.common.collect.Lists.newArrayList; +import static com.google.common.collect.Maps.newHashMap; +import static org.apache.jackrabbit.oak.api.Type.BINARIES; +import static org.apache.jackrabbit.oak.api.Type.BINARY; +import static org.apache.jackrabbit.oak.commons.PathUtils.concat; +import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.annotation.CheckForNull; +import javax.annotation.Nonnull; + +import org.apache.jackrabbit.oak.api.Blob; +import org.apache.jackrabbit.oak.api.PropertyState; +import org.apache.jackrabbit.oak.api.Type; +import org.apache.jackrabbit.oak.commons.IOUtils; +import org.apache.jackrabbit.oak.plugins.memory.BinaryPropertyState; +import org.apache.jackrabbit.oak.plugins.memory.MultiBinaryPropertyState; +import org.apache.jackrabbit.oak.plugins.memory.PropertyStates; +import org.apache.jackrabbit.oak.spi.blob.BlobStore; +import org.apache.jackrabbit.oak.spi.state.ApplyDiff; +import org.apache.jackrabbit.oak.spi.state.NodeBuilder; +import org.apache.jackrabbit.oak.spi.state.NodeState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Predicate; +import com.google.common.base.Supplier; +import com.google.common.hash.Hashing; + +/** + * Tool for compacting segments. + */ +public class Compactor { + + /** Logger instance */ + private static final Logger log = LoggerFactory.getLogger(Compactor.class); + + private final SegmentReader reader; + + private final BlobStore blobStore; + + private final SegmentWriter writer; + + /** + * Filters nodes that will be included in the compaction map, allowing for + * optimization in case of an offline compaction + */ + private final Predicate includeInMap = new OfflineCompactionPredicate(); + + private final ProgressTracker progress = new ProgressTracker(); + + /** + * Map from {@link #getBlobKey(Blob) blob keys} to matching compacted blob + * record identifiers. Used to de-duplicate copies of the same binary + * values. + */ + private final Map> binaries = newHashMap(); + + /** + * Flag to use content equality verification before actually compacting the + * state, on the childNodeChanged diff branch (Used in Backup scenario) + */ + private boolean contentEqualityCheck; + + /** + * Allows the cancellation of the compaction process. If this + * {@code Supplier} returns {@code true}, this compactor will cancel + * compaction and return a partial {@code SegmentNodeState} containing the + * changes compacted before the cancellation. + */ + private final Supplier cancel; + + // TODO add OOME safety + private final Map map = new HashMap( + Integer.getInteger("compress-interval", 100000)); + + public Compactor(SegmentReader reader, SegmentWriter writer, BlobStore blobStore, + Supplier cancel) { + this.reader = reader; + this.writer = writer; + this.blobStore = blobStore; + this.cancel = cancel; + } + + private SegmentNodeBuilder process(NodeState before, NodeState after, + NodeState onto) throws IOException { + SegmentNodeBuilder builder = new SegmentNodeBuilder( + writer.writeNode(onto), writer); + new CompactDiff(builder).diff(before, after); + return builder; + } + + /** + * Compact the differences between a {@code before} and a {@code after} on + * top of an {@code onto} state. + * + * @param before + * the before state + * @param after + * the after state + * @param onto + * the onto state + * @return the compacted state + */ + public SegmentNodeState compact(NodeState before, NodeState after, + NodeState onto) throws IOException { + progress.start(); + SegmentNodeState compacted = process(before, after, onto) + .getNodeState(); + writer.flush(); + progress.stop(); + return compacted; + } + + private class CompactDiff extends ApplyDiff { + private IOException exception; + + /** + * Current processed path, or null if the trace log is not enabled at + * the beginning of the compaction call. The null check will also be + * used to verify if a trace log will be needed or not + */ + private final String path; + + CompactDiff(NodeBuilder builder) { + super(builder); + if (log.isTraceEnabled()) { + this.path = "/"; + } else { + this.path = null; + } + } + + private CompactDiff(NodeBuilder builder, String path, String childName) { + super(builder); + if (path != null) { + this.path = concat(path, childName); + } else { + this.path = null; + } + } + + boolean diff(NodeState before, NodeState after) throws IOException { + boolean success = after.compareAgainstBaseState(before, + new CancelableDiff(this, cancel)); + if (exception != null) { + throw new IOException(exception); + } + return success; + } + + @Override + public boolean propertyAdded(PropertyState after) { + if (path != null) { + log.trace("propertyAdded {}/{}", path, after.getName()); + } + progress.onProperty(); + return super.propertyAdded(compact(after)); + } + + @Override + public boolean propertyChanged(PropertyState before, PropertyState after) { + if (path != null) { + log.trace("propertyChanged {}/{}", path, after.getName()); + } + progress.onProperty(); + return super.propertyChanged(before, compact(after)); + } + + @Override + public boolean childNodeAdded(String name, NodeState after) { + if (path != null) { + log.trace("childNodeAdded {}/{}", path, name); + } + + RecordId id = null; + if (after instanceof SegmentNodeState) { + id = ((SegmentNodeState) after).getRecordId(); + RecordId compactedId = map.get(id); + if (compactedId != null) { + builder.setChildNode(name, new SegmentNodeState(reader, writer, compactedId)); + return true; + } + } + + progress.onNode(); + try { + NodeBuilder child = EMPTY_NODE.builder(); + boolean success = new CompactDiff(child, path, name).diff( + EMPTY_NODE, after); + if (success) { + SegmentNodeState state = writer.writeNode(child + .getNodeState()); + builder.setChildNode(name, state); + if (id != null && includeInMap.apply(after)) { + map.put(id, state.getRecordId()); + } + } + return success; + } catch (IOException e) { + exception = e; + return false; + } + } + + @Override + public boolean childNodeChanged(String name, NodeState before, + NodeState after) { + if (path != null) { + log.trace("childNodeChanged {}/{}", path, name); + } + + RecordId id = null; + if (after instanceof SegmentNodeState) { + id = ((SegmentNodeState) after).getRecordId(); + RecordId compactedId = map.get(id); + if (compactedId != null) { + builder.setChildNode(name, new SegmentNodeState(reader, writer, compactedId)); + return true; + } + } + + if (contentEqualityCheck && before.equals(after)) { + return true; + } + + progress.onNode(); + try { + NodeBuilder child = builder.getChildNode(name); + boolean success = new CompactDiff(child, path, name).diff( + before, after); + if (success) { + RecordId compactedId = writer.writeNode( + child.getNodeState()).getRecordId(); + if (id != null) { + map.put(id, compactedId); + } + } + return success; + } catch (IOException e) { + exception = e; + return false; + } + } + } + + private PropertyState compact(PropertyState property) { + String name = property.getName(); + Type type = property.getType(); + if (type == BINARY) { + Blob blob = compact(property.getValue(Type.BINARY)); + return BinaryPropertyState.binaryProperty(name, blob); + } else if (type == BINARIES) { + List blobs = new ArrayList(); + for (Blob blob : property.getValue(BINARIES)) { + blobs.add(compact(blob)); + } + return MultiBinaryPropertyState.binaryPropertyFromBlob(name, blobs); + } else { + Object value = property.getValue(type); + return PropertyStates.createProperty(name, value, type); + } + } + + /** + * Compacts (and de-duplicates) the given blob. + * + * @param blob + * blob to be compacted + * @return compacted blob + */ + private Blob compact(Blob blob) { + if (blob instanceof SegmentBlob) { + SegmentBlob sb = (SegmentBlob) blob; + try { + // Check if we've already cloned this specific record + RecordId id = sb.getRecordId(); + RecordId compactedId = map.get(id); + if (compactedId != null) { + return new SegmentBlob(blobStore, compactedId); + } + + progress.onBinary(); + + // if the blob is external, just clone it + if (sb.isExternal()) { + SegmentBlob clone = writer.writeBlobRef(sb.getBlobId()); + map.put(id, clone.getRecordId()); + return clone; + } + // if the blob is inlined, just clone it + if (sb.length() < Segment.MEDIUM_LIMIT) { + SegmentBlob clone = writer.writeBlob(blob); + map.put(id, clone.getRecordId()); + return clone; + } + + // alternatively look if the exact same binary has been cloned + String key = getBlobKey(blob); + List ids = binaries.get(key); + if (ids != null) { + for (RecordId duplicateId : ids) { + if (new SegmentBlob(blobStore, duplicateId).equals(sb)) { + map.put(id, duplicateId); + return new SegmentBlob(blobStore, duplicateId); + } + } + } + + // if not, clone the large blob and keep track of the result + sb = writer.writeBlob(blob); + + map.put(id, sb.getRecordId()); + if (ids == null) { + ids = newArrayList(); + binaries.put(key, ids); + } + ids.add(sb.getRecordId()); + + return sb; + } catch (IOException e) { + log.warn("Failed to compact a blob", e); + // fall through + } + } + + // no way to compact this blob, so we'll just keep it as-is + return blob; + } + + private static String getBlobKey(Blob blob) throws IOException { + InputStream stream = blob.getNewStream(); + try { + byte[] buffer = new byte[SegmentWriter.BLOCK_SIZE]; + int n = IOUtils.readFully(stream, buffer, 0, buffer.length); + return blob.length() + ":" + Hashing.sha1().hashBytes(buffer, 0, n); + } finally { + stream.close(); + } + } + + private static class ProgressTracker { + private final long logAt = Long.getLong("compaction-progress-log", + 150000); + + private long start = 0; + + private long nodes = 0; + private long properties = 0; + private long binaries = 0; + + void start() { + nodes = 0; + properties = 0; + binaries = 0; + start = System.currentTimeMillis(); + } + + void onNode() { + if (++nodes % logAt == 0) { + logProgress(start, false); + start = System.currentTimeMillis(); + } + } + + void onProperty() { + properties++; + } + + void onBinary() { + binaries++; + } + + void stop() { + logProgress(start, true); + } + + private void logProgress(long start, boolean done) { + log.debug( + "Compacted {} nodes, {} properties, {} binaries in {} ms.", + nodes, properties, binaries, System.currentTimeMillis() + - start); + if (done) { + log.info( + "Finished compaction: {} nodes, {} properties, {} binaries.", + nodes, properties, binaries); + } + } + } + + private static class OfflineCompactionPredicate implements + Predicate { + + /** + * over 64K in size, node will be included in the compaction map + */ + private static final long offlineThreshold = 65536; + + @Override + public boolean apply(NodeState state) { + if (state.getChildNodeCount(2) > 1) { + return true; + } + long count = 0; + for (PropertyState ps : state.getProperties()) { + Type type = ps.getType(); + for (int i = 0; i < ps.count(); i++) { + long size = 0; + if (type == BINARY || type == BINARIES) { + Blob blob = ps.getValue(BINARY, i); + if (blob instanceof SegmentBlob) { + if (!((SegmentBlob) blob).isExternal()) { + size += blob.length(); + } + } else { + size += blob.length(); + } + } else { + size = ps.size(i); + } + count += size; + if (size >= offlineThreshold || count >= offlineThreshold) { + return true; + } + } + } + return false; + } + } + + public void setContentEqualityCheck(boolean contentEqualityCheck) { + this.contentEqualityCheck = contentEqualityCheck; + } + + interface PartialCompactionMap { + + /** + * Retrieve the record id {@code before} maps to or {@code null} if no + * such id exists. + * + * @param before + * before record id + * @return after record id or {@code null} + */ + @CheckForNull + RecordId get(@Nonnull RecordId before); + + /** + * Adds a new entry to the compaction map. Overwriting a previously + * added entry is not supported. + * + * @param before + * before record id + * @param after + * after record id + * @throws IllegalArgumentException + * if {@code before} already exists in the map + */ + void put(@Nonnull RecordId before, @Nonnull RecordId after); + } + +} Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java (date 1464767331000) +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java (date 1464767926000) @@ -206,6 +206,27 @@ } /** + * Write a reference to an external blob, without loading it, as this can + * happen without an external binary store attached + * + * @param blobId + * blob to write + * @return The segment blob written + * @throws IOException + */ + public SegmentBlob writeBlobRef(@Nonnull final String blob) throws IOException { + RecordId blobId = writeOperationHandler + .execute(new SegmentWriteOperation() { + @Override + public RecordId execute(SegmentBufferWriter writer) + throws IOException { + return with(writer).writeBlobId(blob); + } + }); + return new SegmentBlob(blobStore, blobId); + } + + /** * Writes a block record containing the given block of bytes. * * @param bytes source buffer Index: oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactionAndCleanupIT.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactionAndCleanupIT.java (date 1464767331000) +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactionAndCleanupIT.java (date 1464767926000) @@ -169,6 +169,92 @@ } } + @Test + public void offlineCompaction() + throws IOException, CommitFailedException { + FileStore fileStore = FileStore.builder(getFileStoreFolder()) + .withGCOptions(DEFAULT.setRetainedGenerations(2)) + .withMaxFileSize(1) + .build(); + fileStore.getGcOptions().setOffline(true); + SegmentNodeStore nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build(); + + try { + // 5MB blob + int blobSize = 5 * 1024 * 1024; + + // Create ~2MB of data + NodeBuilder extra = nodeStore.getRoot().builder(); + NodeBuilder content = extra.child("content"); + for (int i = 0; i < 10000; i++) { + NodeBuilder c = content.child("c" + i); + for (int j = 0; j < 1000; j++) { + c.setProperty("p" + i, "v" + i); + } + } + nodeStore.merge(extra, EmptyHook.INSTANCE, CommitInfo.EMPTY); + fileStore.flush(); + + long size1 = fileStore.size(); + log.debug("File store size {}", byteCountToDisplaySize(size1)); + + // Create a property with 5 MB blob + NodeBuilder builder = nodeStore.getRoot().builder(); + builder.setProperty("blob1", createBlob(nodeStore, blobSize)); + nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); + fileStore.flush(); + + long size2 = fileStore.size(); + assertSize("1st blob added", size2, size1 + blobSize, size1 + blobSize + (blobSize / 100)); + + // Now remove the property. No gc yet -> size doesn't shrink + builder = nodeStore.getRoot().builder(); + builder.removeProperty("blob1"); + nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); + fileStore.flush(); + + long size3 = fileStore.size(); + assertSize("1st blob removed", size3, size2, size2 + 4096); + + // 1st gc cycle -> no reclaimable garbage... + fileStore.compact(); + fileStore.cleanup(); + + long size4 = fileStore.size(); + assertSize("1st gc", size4, size3, size3 + size1); + + // Add another 5MB binary doubling the blob size + builder = nodeStore.getRoot().builder(); + builder.setProperty("blob2", createBlob(nodeStore, blobSize)); + nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); + fileStore.flush(); + + long size5 = fileStore.size(); + assertSize("2nd blob added", size5, size4 + blobSize, size4 + blobSize + (blobSize / 100)); + + // 2st gc cycle -> 1st blob should get collected + fileStore.compact(); + fileStore.cleanup(); + + long size6 = fileStore.size(); + assertSize("2nd gc", size6, size5 - blobSize - size1, size5 - blobSize); + + // 3rtd gc cycle -> no significant change + fileStore.compact(); + fileStore.cleanup(); + + long size7 = fileStore.size(); + assertSize("3rd gc", size7, size6 * 10/11 , size6 * 10/9); + + // No data loss + byte[] blob = ByteStreams.toByteArray(nodeStore.getRoot() + .getProperty("blob2").getValue(Type.BINARY).getNewStream()); + assertEquals(blobSize, blob.length); + } finally { + fileStore.close(); + } + } + private static void assertSize(String info, long size, long lower, long upper) { log.debug("File Store {} size {}, expected in interval [{},{}]", info, size, lower, upper); Index: oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactorTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactorTest.java (date 1464767331000) +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactorTest.java (date 1464767926000) @@ -18,8 +18,13 @@ */ package org.apache.jackrabbit.oak.segment; +import static org.apache.jackrabbit.oak.segment.SegmentVersion.LATEST_VERSION; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + import java.io.IOException; +import com.google.common.base.Suppliers; import org.apache.jackrabbit.oak.Oak; import org.apache.jackrabbit.oak.api.CommitFailedException; import org.apache.jackrabbit.oak.segment.memory.MemoryStore; @@ -30,14 +35,51 @@ import org.apache.jackrabbit.oak.spi.state.NodeState; import org.apache.jackrabbit.oak.spi.state.NodeStore; import org.junit.Before; +import org.junit.Test; public class CompactorTest { - private SegmentStore segmentStore; + private MemoryStore memoryStore; @Before public void openSegmentStore() throws IOException { - segmentStore = new MemoryStore(); + memoryStore = new MemoryStore(); + } + + @Test + public void testCompactor() throws Exception { + NodeStore store = SegmentNodeStoreBuilders.builder(memoryStore).build(); + init(store); + + SegmentWriter writer = SegmentWriters.segmentWriter(memoryStore, LATEST_VERSION, "c", 1); + Compactor compactor = new Compactor(memoryStore.getReader(), writer, + memoryStore.getBlobStore(), Suppliers.ofInstance(false)); + addTestContent(store, 0); + + NodeState initial = store.getRoot(); + SegmentNodeState after = compactor.compact(initial, store.getRoot(), + initial); + assertEquals(store.getRoot(), after); + + addTestContent(store, 1); + after = compactor.compact(initial, store.getRoot(), initial); + assertEquals(store.getRoot(), after); + } + + @Test + public void testCancel() throws Throwable { + + // Create a Compactor that will cancel itself as soon as possible. The + // early cancellation is the reason why the returned SegmentNodeState + // doesn't have the child named "b". + + NodeStore store = SegmentNodeStoreBuilders.builder(memoryStore).build(); + SegmentWriter writer = SegmentWriters.segmentWriter(memoryStore, LATEST_VERSION, "c", 1); + Compactor compactor = new Compactor(memoryStore.getReader(), writer, memoryStore.getBlobStore(), + Suppliers.ofInstance(true)); + SegmentNodeState sns = compactor.compact(store.getRoot(), + addChild(store.getRoot(), "b"), store.getRoot()); + assertFalse(sns.hasChildNode("b")); } private NodeState addChild(NodeState current, String name) {