diff --git oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java index 580febc..bbaf273 100644 --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java @@ -760,6 +760,7 @@ public class FileStore implements SegmentStore, Closeable { Set bulkRefs = newHashSet(); Map cleaned = newLinkedHashMap(); + System.out.println("Pre cleanup readers: " + readers); fileStoreLock.writeLock().lock(); try { gcListener.info("TarMK GC #{}: cleanup started.", GC_COUNT); @@ -782,11 +783,7 @@ public class FileStore implements SegmentStore, Closeable { } finally { fileStoreLock.writeLock().unlock(); } - - // compute initial size here to better reflect repository size after the previous tar writer was closed - long initialSize = size(); - gcListener.info("TarMK GC #{}: current repository size is {} ({} bytes)", - GC_COUNT, humanReadableByteCount(initialSize), initialSize); + System.out.println("Before cleanup readers: " + readers); Set reclaim = newHashSet(); for (TarReader reader : cleaned.keySet()) { @@ -806,6 +803,9 @@ public class FileStore implements SegmentStore, Closeable { break; } } + + // compute initial size here to better reflect repository size after the previous tar writer was closed + long initialSize = 0; // it doesn't account for concurrent commits that might have happened long afterCleanupSize = 0; @@ -827,6 +827,9 @@ public class FileStore implements SegmentStore, Closeable { if (newReader != reader) { oldReaders.add(reader); } + + //incrementally compute initialSize as a sum of initial readers' sizes + initialSize += reader.size(); } else { sweptReaders.add(reader); } @@ -835,8 +838,14 @@ public class FileStore implements SegmentStore, Closeable { } finally { fileStoreLock.writeLock().unlock(); } + + gcListener.info("TarMK GC #{}: current repository size is {} ({} bytes)", + GC_COUNT, humanReadableByteCount(initialSize), initialSize); + System.out.println("Initial size: "+ humanReadableByteCount(initialSize)); + tracker.clearSegmentIdTables(reclaimed, gcInfo); + System.out.println("After cleanup readers: " + readers); // Close old readers *after* setting readers to the new readers to avoid accessing // a closed reader from readSegment() LinkedList toRemove = newLinkedList(); @@ -848,6 +857,8 @@ public class FileStore implements SegmentStore, Closeable { } long finalSize = size(); + System.out.println("After cleanup size: " + humanReadableByteCount(afterCleanupSize)); + System.out.println("Final size: " + humanReadableByteCount(finalSize)); long reclaimedSize = initialSize - afterCleanupSize; stats.reclaimed(reclaimedSize); gcJournal.persist(reclaimedSize, finalSize); diff --git oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TarReader.java oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TarReader.java index 2f6abca..31fdc9b 100644 --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TarReader.java +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TarReader.java @@ -815,7 +815,9 @@ class TarReader implements Closeable { log.debug("Cleaning up {}", name); Set cleaned = newHashSet(); - int size = 0; + int sweptEntriesSize = 0; + int currentEntriesSize = 0; + int count = 0; TarEntry[] entries = getEntries(); for (int i = 0; i < entries.length; i++) { @@ -825,25 +827,25 @@ class TarReader implements Closeable { cleaned.add(id); entries[i] = null; } else { - size += getEntrySize(entry.size()); + sweptEntriesSize += getEntrySize(entry.size()); count += 1; } + + currentEntriesSize += getEntrySize(entry.size());; } - size += getEntrySize(TarEntry.SIZE * count + 16); - size += 2 * BLOCK_SIZE; - + if (count == 0) { log.debug("None of the entries of {} are referenceable.", name); logCleanedSegments(cleaned); return null; } - if (size >= access.length() * 3 / 4 && hasGraph()) { + if (sweptEntriesSize >= currentEntriesSize * 3 / 4 && hasGraph()) { // the space savings are not worth it at less than 25%, // unless this tar file lacks a pre-compiled segment graph // in which case we'll always generate a new tar file with // the graph to speed up future garbage collection runs. log.debug("Not enough space savings. ({}/{}). Skipping clean up of {}", - access.length() - size, access.length(), name); + access.length() - sweptEntriesSize, access.length(), name); return this; } if (!hasGraph()) { diff --git oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactionAndCleanupIT.java oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactionAndCleanupIT.java index 4a7fd80..99c3935 100644 --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactionAndCleanupIT.java +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactionAndCleanupIT.java @@ -25,8 +25,8 @@ import static java.lang.Integer.getInteger; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.commons.io.FileUtils.byteCountToDisplaySize; import static org.apache.jackrabbit.oak.api.Type.STRING; -import static org.apache.jackrabbit.oak.commons.FixturesHelper.Fixture.SEGMENT_MK; import static org.apache.jackrabbit.oak.commons.FixturesHelper.getFixtures; +import static org.apache.jackrabbit.oak.commons.FixturesHelper.Fixture.SEGMENT_MK; import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE; import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions.defaultGCOptions; import static org.apache.jackrabbit.oak.segment.file.FileStoreBuilder.fileStoreBuilder; @@ -46,32 +46,38 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import com.google.common.io.ByteStreams; import org.apache.jackrabbit.oak.api.Blob; import org.apache.jackrabbit.oak.api.CommitFailedException; import org.apache.jackrabbit.oak.api.PropertyState; import org.apache.jackrabbit.oak.api.Type; +import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser; import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions; import org.apache.jackrabbit.oak.segment.file.FileStore; +import org.apache.jackrabbit.oak.segment.file.FileStoreGCMonitor; import org.apache.jackrabbit.oak.spi.commit.CommitInfo; import org.apache.jackrabbit.oak.spi.commit.EmptyHook; import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry; import org.apache.jackrabbit.oak.spi.state.NodeBuilder; import org.apache.jackrabbit.oak.spi.state.NodeState; import org.apache.jackrabbit.oak.spi.state.NodeStore; +import org.apache.jackrabbit.oak.stats.Clock; import org.apache.jackrabbit.oak.stats.DefaultStatisticsProvider; +import org.apache.jackrabbit.oak.stats.StatisticsProvider; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.io.ByteStreams; + public class CompactionAndCleanupIT { private static final Logger log = LoggerFactory @@ -88,837 +94,896 @@ public class CompactionAndCleanupIT { assumeTrue(getFixtures().contains(SEGMENT_MK)); } - @Test - public void compactionNoBinaryClone() throws Exception { - ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - FileStore fileStore = fileStoreBuilder(getFileStoreFolder()) - .withGCOptions(defaultGCOptions().setRetainedGenerations(2)) - .withStatisticsProvider(new DefaultStatisticsProvider(executor)) - .withMaxFileSize(1) - .build(); - SegmentNodeStore nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build(); - - try { - // 5MB blob - int blobSize = 5 * 1024 * 1024; - - // Create ~2MB of data - NodeBuilder extra = nodeStore.getRoot().builder(); - NodeBuilder content = extra.child("content"); - for (int i = 0; i < 10000; i++) { - NodeBuilder c = content.child("c" + i); - for (int j = 0; j < 1000; j++) { - c.setProperty("p" + i, "v" + i); - } - } - nodeStore.merge(extra, EmptyHook.INSTANCE, CommitInfo.EMPTY); - fileStore.flush(); - - long size1 = fileStore.getStats().getApproximateSize(); - log.debug("File store size {}", byteCountToDisplaySize(size1)); - - // Create a property with 5 MB blob - NodeBuilder builder = nodeStore.getRoot().builder(); - builder.setProperty("blob1", createBlob(nodeStore, blobSize)); - nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); - fileStore.flush(); - - long size2 = fileStore.getStats().getApproximateSize(); - assertSize("1st blob added", size2, size1 + blobSize, size1 + blobSize + (blobSize / 100)); +// @Test +// public void compactionNoBinaryClone() throws Exception { +// ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); +// FileStore fileStore = fileStoreBuilder(getFileStoreFolder()) +// .withGCOptions(defaultGCOptions().setRetainedGenerations(2)) +// .withStatisticsProvider(new DefaultStatisticsProvider(executor)) +// .withMaxFileSize(1) +// .build(); +// SegmentNodeStore nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build(); +// +// try { +// // 5MB blob +// int blobSize = 5 * 1024 * 1024; +// +// // Create ~2MB of data +// NodeBuilder extra = nodeStore.getRoot().builder(); +// NodeBuilder content = extra.child("content"); +// for (int i = 0; i < 10000; i++) { +// NodeBuilder c = content.child("c" + i); +// for (int j = 0; j < 1000; j++) { +// c.setProperty("p" + i, "v" + i); +// } +// } +// nodeStore.merge(extra, EmptyHook.INSTANCE, CommitInfo.EMPTY); +// fileStore.flush(); +// +// long size1 = fileStore.getStats().getApproximateSize(); +// log.debug("File store size {}", byteCountToDisplaySize(size1)); +// +// // Create a property with 5 MB blob +// NodeBuilder builder = nodeStore.getRoot().builder(); +// builder.setProperty("blob1", createBlob(nodeStore, blobSize)); +// nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); +// fileStore.flush(); +// +// long size2 = fileStore.getStats().getApproximateSize(); +// assertSize("1st blob added", size2, size1 + blobSize, size1 + blobSize + (blobSize / 100)); +// +// // Now remove the property. No gc yet -> size doesn't shrink +// builder = nodeStore.getRoot().builder(); +// builder.removeProperty("blob1"); +// nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); +// fileStore.flush(); +// +// long size3 = fileStore.getStats().getApproximateSize(); +// assertSize("1st blob removed", size3, size2, size2 + 4096); +// +// // 1st gc cycle -> no reclaimable garbage... +// fileStore.compact(); +// fileStore.cleanup(); +// +// long size4 = fileStore.getStats().getApproximateSize(); +// assertSize("1st gc", size4, size3, size3 + size1); +// +// // Add another 5MB binary doubling the blob size +// builder = nodeStore.getRoot().builder(); +// builder.setProperty("blob2", createBlob(nodeStore, blobSize)); +// nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); +// fileStore.flush(); +// +// long size5 = fileStore.getStats().getApproximateSize(); +// assertSize("2nd blob added", size5, size4 + blobSize, size4 + blobSize + (blobSize / 100)); +// +// // 2st gc cycle -> 1st blob should get collected +// fileStore.compact(); +// fileStore.cleanup(); +// +// long size6 = fileStore.getStats().getApproximateSize(); +// assertSize("2nd gc", size6, size5 - blobSize - size1, size5 - blobSize); +// +// // 3rtd gc cycle -> no significant change +// fileStore.compact(); +// fileStore.cleanup(); +// +// long size7 = fileStore.getStats().getApproximateSize(); +// assertSize("3rd gc", size7, size6 * 10/11 , size6 * 10/9); +// +// // No data loss +// byte[] blob = ByteStreams.toByteArray(nodeStore.getRoot() +// .getProperty("blob2").getValue(Type.BINARY).getNewStream()); +// assertEquals(blobSize, blob.length); +// } finally { +// fileStore.close(); +// } +// } +// +// @Test +// public void offlineCompaction() throws Exception { +// SegmentGCOptions gcOptions = defaultGCOptions().setOffline(); +// ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); +// FileStore fileStore = fileStoreBuilder(getFileStoreFolder()) +// .withMaxFileSize(1) +// .withGCOptions(gcOptions) +// .withStatisticsProvider(new DefaultStatisticsProvider(executor)) +// .build(); +// SegmentNodeStore nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build(); +// +// try { +// // 5MB blob +// int blobSize = 5 * 1024 * 1024; +// +// // Create ~2MB of data +// NodeBuilder extra = nodeStore.getRoot().builder(); +// NodeBuilder content = extra.child("content"); +// for (int i = 0; i < 10000; i++) { +// NodeBuilder c = content.child("c" + i); +// for (int j = 0; j < 1000; j++) { +// c.setProperty("p" + i, "v" + i); +// } +// } +// nodeStore.merge(extra, EmptyHook.INSTANCE, CommitInfo.EMPTY); +// fileStore.flush(); +// +// long size1 = fileStore.getStats().getApproximateSize(); +// log.debug("File store size {}", byteCountToDisplaySize(size1)); +// +// // Create a property with 5 MB blob +// NodeBuilder builder = nodeStore.getRoot().builder(); +// builder.setProperty("blob1", createBlob(nodeStore, blobSize)); +// nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); +// fileStore.flush(); +// +// long size2 = fileStore.getStats().getApproximateSize(); +// assertSize("1st blob added", size2, size1 + blobSize, size1 + blobSize + (blobSize / 100)); +// +// // Now remove the property. No gc yet -> size doesn't shrink +// builder = nodeStore.getRoot().builder(); +// builder.removeProperty("blob1"); +// nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); +// fileStore.flush(); +// +// long size3 = fileStore.getStats().getApproximateSize(); +// assertSize("1st blob removed", size3, size2, size2 + 4096); +// +// // 1st gc cycle -> 1st blob should get collected +// fileStore.compact(); +// fileStore.cleanup(); +// +// long size4 = fileStore.getStats().getApproximateSize(); +// assertSize("1st gc", size4, size3 - blobSize - size1, size3 +// - blobSize); +// +// // Add another 5MB binary +// builder = nodeStore.getRoot().builder(); +// builder.setProperty("blob2", createBlob(nodeStore, blobSize)); +// nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); +// fileStore.flush(); +// +// long size5 = fileStore.getStats().getApproximateSize(); +// assertSize("2nd blob added", size5, size4 + blobSize, size4 + blobSize + (blobSize / 100)); +// +// // 2st gc cycle -> 2nd blob should *not* be collected +// fileStore.compact(); +// fileStore.cleanup(); +// +// long size6 = fileStore.getStats().getApproximateSize(); +// assertSize("2nd gc", size6, size5 * 10/11, size5 * 10/9); +// +// // 3rd gc cycle -> no significant change +// fileStore.compact(); +// fileStore.cleanup(); +// +// long size7 = fileStore.getStats().getApproximateSize(); +// assertSize("3rd gc", size7, size6 * 10/11 , size6 * 10/9); +// +// // No data loss +// byte[] blob = ByteStreams.toByteArray(nodeStore.getRoot() +// .getProperty("blob2").getValue(Type.BINARY).getNewStream()); +// assertEquals(blobSize, blob.length); +// } finally { +// fileStore.close(); +// } +// } +// +// /** +// * Create a lot of data nodes (no binaries) and a few checkpoints, verify +// * that compacting checkpoints will not cause the size to explode +// */ +// @Test +// public void offlineCompactionCps() throws Exception { +// SegmentGCOptions gcOptions = defaultGCOptions().setOffline(); +// ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); +// FileStore fileStore = fileStoreBuilder(getFileStoreFolder()) +// .withMaxFileSize(1) +// .withGCOptions(gcOptions) +// .withStatisticsProvider(new DefaultStatisticsProvider(executor)) +// .build(); +// SegmentNodeStore nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build(); +// try { +// // Create ~2MB of data +// NodeBuilder extra = nodeStore.getRoot().builder(); +// NodeBuilder content = extra.child("content"); +// for (int i = 0; i < 10000; i++) { +// NodeBuilder c = content.child("c" + i); +// for (int j = 0; j < 1000; j++) { +// c.setProperty("p" + i, "v" + i); +// } +// } +// nodeStore.merge(extra, EmptyHook.INSTANCE, CommitInfo.EMPTY); +// fileStore.flush(); +// fileStore.compact(); +// fileStore.cleanup(); +// // Compacts to 548Kb +// long size0 = fileStore.getStats().getApproximateSize(); +// +// int cpNo = 4; +// Set cps = new HashSet(); +// for (int i = 0; i < cpNo; i++) { +// cps.add(nodeStore.checkpoint(60000)); +// } +// assertEquals(cpNo, cps.size()); +// for (String cp : cps) { +// assertTrue(nodeStore.retrieve(cp) != null); +// } +// +// long size1 = fileStore.getStats().getApproximateSize(); +// assertSize("with checkpoints added", size1, size0, size0 * 11 / 10); +// fileStore.compact(); +// fileStore.cleanup(); +// long size2 = fileStore.getStats().getApproximateSize(); +// assertSize("with checkpoints compacted", size2, size1 * 9/10, size1 * 11 / 10); +// } finally { +// fileStore.close(); +// } +// } +// +// /** +// * Create 2 binary nodes with same content but not same reference. Verify +// * de-duplication capabilities of compaction. +// */ +// @Test +// public void offlineCompactionBinC1() throws Exception { +// SegmentGCOptions gcOptions = defaultGCOptions().setOffline() +// .withBinaryDeduplication(); +// ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); +// FileStore fileStore = fileStoreBuilder(getFileStoreFolder()) +// .withMaxFileSize(1) +// .withGCOptions(gcOptions) +// .withStatisticsProvider(new DefaultStatisticsProvider(executor)) +// .build(); +// SegmentNodeStore nodeStore = SegmentNodeStoreBuilders +// .builder(fileStore).build(); +// +// try { +// NodeBuilder extra = nodeStore.getRoot().builder(); +// NodeBuilder content = extra.child("content"); +// +// int blobSize = 5 * 1024 * 1024; +// byte[] data = new byte[blobSize]; +// new Random().nextBytes(data); +// +// NodeBuilder c1 = content.child("c1"); +// Blob b1 = nodeStore.createBlob(new ByteArrayInputStream(data)); +// c1.setProperty("blob1", b1); +// NodeBuilder c2 = content.child("c2"); +// Blob b2 = nodeStore.createBlob(new ByteArrayInputStream(data)); +// c2.setProperty("blob2", b2); +// nodeStore.merge(extra, EmptyHook.INSTANCE, CommitInfo.EMPTY); +// fileStore.flush(); +// +// int cpNo = 4; +// Set cps = new HashSet(); +// for (int i = 0; i < cpNo; i++) { +// cps.add(nodeStore.checkpoint(60000)); +// } +// assertEquals(cpNo, cps.size()); +// for (String cp : cps) { +// assertTrue(nodeStore.retrieve(cp) != null); +// } +// +// long size1 = fileStore.getStats().getApproximateSize(); +// fileStore.compact(); +// fileStore.cleanup(); +// long size2 = fileStore.getStats().getApproximateSize(); +// assertSize("with compacted binaries", size2, 0, size1 - blobSize); +// } finally { +// fileStore.close(); +// } +// } +// +// /** +// * Create 2 binary nodes with same content but not same reference. Reduce +// * the max size if de-duplicated binaries under the binary length. Verify +// * de-duplication capabilities of compaction. +// */ +// @Test +// public void offlineCompactionBinC2() throws Exception { +// int blobSize = 5 * 1024 * 1024; +// +// SegmentGCOptions gcOptions = defaultGCOptions().setOffline() +// .withBinaryDeduplication() +// .setBinaryDeduplicationMaxSize(blobSize / 2); +// ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); +// FileStore fileStore = fileStoreBuilder(getFileStoreFolder()) +// .withMaxFileSize(1) +// .withGCOptions(gcOptions) +// .withStatisticsProvider(new DefaultStatisticsProvider(executor)) +// .build(); +// SegmentNodeStore nodeStore = SegmentNodeStoreBuilders +// .builder(fileStore).build(); +// +// try { +// NodeBuilder extra = nodeStore.getRoot().builder(); +// NodeBuilder content = extra.child("content"); +// +// byte[] data = new byte[blobSize]; +// new Random().nextBytes(data); +// +// NodeBuilder c1 = content.child("c1"); +// Blob b1 = nodeStore.createBlob(new ByteArrayInputStream(data)); +// c1.setProperty("blob1", b1); +// NodeBuilder c2 = content.child("c2"); +// Blob b2 = nodeStore.createBlob(new ByteArrayInputStream(data)); +// c2.setProperty("blob2", b2); +// nodeStore.merge(extra, EmptyHook.INSTANCE, CommitInfo.EMPTY); +// fileStore.flush(); +// +// int cpNo = 4; +// Set cps = new HashSet(); +// for (int i = 0; i < cpNo; i++) { +// cps.add(nodeStore.checkpoint(60000)); +// } +// assertEquals(cpNo, cps.size()); +// for (String cp : cps) { +// assertTrue(nodeStore.retrieve(cp) != null); +// } +// +// long size1 = fileStore.getStats().getApproximateSize(); +// fileStore.compact(); +// fileStore.cleanup(); +// long size2 = fileStore.getStats().getApproximateSize(); +// +// // not expected to reduce the size too much, as the binaries are +// // above the threshold +// assertSize("with compacted binaries", size2, size1 * 9 / 10, +// size1 * 11 / 10); +// } finally { +// fileStore.close(); +// } +// } +// +// /** +// * Create 2 binary nodes with same content and same reference. Verify +// * de-duplication capabilities of compaction +// */ +// @Test +// public void offlineCompactionBinR1() throws Exception { +// SegmentGCOptions gcOptions = defaultGCOptions().setOffline(); +// ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); +// FileStore fileStore = fileStoreBuilder(getFileStoreFolder()) +// .withMaxFileSize(1) +// .withGCOptions(gcOptions) +// .withStatisticsProvider(new DefaultStatisticsProvider(executor)) +// .build(); +// SegmentNodeStore nodeStore = SegmentNodeStoreBuilders +// .builder(fileStore).build(); +// +// try { +// NodeBuilder extra = nodeStore.getRoot().builder(); +// NodeBuilder content = extra.child("content"); +// +// int blobSize = 5 * 1024 * 1024; +// byte[] data = new byte[blobSize]; +// new Random().nextBytes(data); +// Blob b = nodeStore.createBlob(new ByteArrayInputStream(data)); +// +// NodeBuilder c1 = content.child("c1"); +// c1.setProperty("blob1", b); +// NodeBuilder c2 = content.child("c2"); +// c2.setProperty("blob2", b); +// nodeStore.merge(extra, EmptyHook.INSTANCE, CommitInfo.EMPTY); +// fileStore.flush(); +// +// int cpNo = 4; +// Set cps = new HashSet(); +// for (int i = 0; i < cpNo; i++) { +// cps.add(nodeStore.checkpoint(60000)); +// } +// assertEquals(cpNo, cps.size()); +// for (String cp : cps) { +// assertTrue(nodeStore.retrieve(cp) != null); +// } +// +// // 5Mb, de-duplication by the SegmentWriter +// long size1 = fileStore.getStats().getApproximateSize(); +// fileStore.compact(); +// fileStore.cleanup(); +// long size2 = fileStore.getStats().getApproximateSize(); +// assertSize("with compacted binaries", size2, 0, size1 * 11 / 10); +// } finally { +// fileStore.close(); +// } +// } +// +// private static void assertSize(String info, long size, long lower, long upper) { +// log.debug("File Store {} size {}, expected in interval [{},{}]", +// info, size, lower, upper); +// assertTrue("File Store " + info + " size expected in interval " + +// "[" + (lower) + "," + (upper) + "] but was: " + (size), +// size >= lower && size <= (upper)); +// } - // Now remove the property. No gc yet -> size doesn't shrink - builder = nodeStore.getRoot().builder(); - builder.removeProperty("blob1"); - nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); - fileStore.flush(); - - long size3 = fileStore.getStats().getApproximateSize(); - assertSize("1st blob removed", size3, size2, size2 + 4096); - - // 1st gc cycle -> no reclaimable garbage... - fileStore.compact(); - fileStore.cleanup(); - - long size4 = fileStore.getStats().getApproximateSize(); - assertSize("1st gc", size4, size3, size3 + size1); - - // Add another 5MB binary doubling the blob size - builder = nodeStore.getRoot().builder(); - builder.setProperty("blob2", createBlob(nodeStore, blobSize)); - nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); - fileStore.flush(); - - long size5 = fileStore.getStats().getApproximateSize(); - assertSize("2nd blob added", size5, size4 + blobSize, size4 + blobSize + (blobSize / 100)); - - // 2st gc cycle -> 1st blob should get collected - fileStore.compact(); - fileStore.cleanup(); - - long size6 = fileStore.getStats().getApproximateSize(); - assertSize("2nd gc", size6, size5 - blobSize - size1, size5 - blobSize); - - // 3rtd gc cycle -> no significant change - fileStore.compact(); - fileStore.cleanup(); - - long size7 = fileStore.getStats().getApproximateSize(); - assertSize("3rd gc", size7, size6 * 10/11 , size6 * 10/9); - - // No data loss - byte[] blob = ByteStreams.toByteArray(nodeStore.getRoot() - .getProperty("blob2").getValue(Type.BINARY).getNewStream()); - assertEquals(blobSize, blob.length); - } finally { - fileStore.close(); - } - } - - @Test - public void offlineCompaction() throws Exception { - SegmentGCOptions gcOptions = defaultGCOptions().setOffline(); - ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - FileStore fileStore = fileStoreBuilder(getFileStoreFolder()) - .withMaxFileSize(1) - .withGCOptions(gcOptions) - .withStatisticsProvider(new DefaultStatisticsProvider(executor)) - .build(); - SegmentNodeStore nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build(); - - try { - // 5MB blob - int blobSize = 5 * 1024 * 1024; - - // Create ~2MB of data - NodeBuilder extra = nodeStore.getRoot().builder(); - NodeBuilder content = extra.child("content"); - for (int i = 0; i < 10000; i++) { - NodeBuilder c = content.child("c" + i); - for (int j = 0; j < 1000; j++) { - c.setProperty("p" + i, "v" + i); - } - } - nodeStore.merge(extra, EmptyHook.INSTANCE, CommitInfo.EMPTY); - fileStore.flush(); - - long size1 = fileStore.getStats().getApproximateSize(); - log.debug("File store size {}", byteCountToDisplaySize(size1)); - - // Create a property with 5 MB blob - NodeBuilder builder = nodeStore.getRoot().builder(); - builder.setProperty("blob1", createBlob(nodeStore, blobSize)); - nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); - fileStore.flush(); - - long size2 = fileStore.getStats().getApproximateSize(); - assertSize("1st blob added", size2, size1 + blobSize, size1 + blobSize + (blobSize / 100)); - - // Now remove the property. No gc yet -> size doesn't shrink - builder = nodeStore.getRoot().builder(); - builder.removeProperty("blob1"); - nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); - fileStore.flush(); - - long size3 = fileStore.getStats().getApproximateSize(); - assertSize("1st blob removed", size3, size2, size2 + 4096); - - // 1st gc cycle -> 1st blob should get collected - fileStore.compact(); - fileStore.cleanup(); - - long size4 = fileStore.getStats().getApproximateSize(); - assertSize("1st gc", size4, size3 - blobSize - size1, size3 - - blobSize); - - // Add another 5MB binary - builder = nodeStore.getRoot().builder(); - builder.setProperty("blob2", createBlob(nodeStore, blobSize)); - nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); - fileStore.flush(); - - long size5 = fileStore.getStats().getApproximateSize(); - assertSize("2nd blob added", size5, size4 + blobSize, size4 + blobSize + (blobSize / 100)); - - // 2st gc cycle -> 2nd blob should *not* be collected - fileStore.compact(); - fileStore.cleanup(); - - long size6 = fileStore.getStats().getApproximateSize(); - assertSize("2nd gc", size6, size5 * 10/11, size5 * 10/9); - - // 3rd gc cycle -> no significant change - fileStore.compact(); - fileStore.cleanup(); - - long size7 = fileStore.getStats().getApproximateSize(); - assertSize("3rd gc", size7, size6 * 10/11 , size6 * 10/9); - - // No data loss - byte[] blob = ByteStreams.toByteArray(nodeStore.getRoot() - .getProperty("blob2").getValue(Type.BINARY).getNewStream()); - assertEquals(blobSize, blob.length); - } finally { - fileStore.close(); - } - } - - /** - * Create a lot of data nodes (no binaries) and a few checkpoints, verify - * that compacting checkpoints will not cause the size to explode - */ - @Test - public void offlineCompactionCps() throws Exception { - SegmentGCOptions gcOptions = defaultGCOptions().setOffline(); - ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - FileStore fileStore = fileStoreBuilder(getFileStoreFolder()) - .withMaxFileSize(1) - .withGCOptions(gcOptions) - .withStatisticsProvider(new DefaultStatisticsProvider(executor)) - .build(); - SegmentNodeStore nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build(); - try { - // Create ~2MB of data - NodeBuilder extra = nodeStore.getRoot().builder(); - NodeBuilder content = extra.child("content"); - for (int i = 0; i < 10000; i++) { - NodeBuilder c = content.child("c" + i); - for (int j = 0; j < 1000; j++) { - c.setProperty("p" + i, "v" + i); - } - } - nodeStore.merge(extra, EmptyHook.INSTANCE, CommitInfo.EMPTY); - fileStore.flush(); - fileStore.compact(); - fileStore.cleanup(); - // Compacts to 548Kb - long size0 = fileStore.getStats().getApproximateSize(); - - int cpNo = 4; - Set cps = new HashSet(); - for (int i = 0; i < cpNo; i++) { - cps.add(nodeStore.checkpoint(60000)); - } - assertEquals(cpNo, cps.size()); - for (String cp : cps) { - assertTrue(nodeStore.retrieve(cp) != null); - } - - long size1 = fileStore.getStats().getApproximateSize(); - assertSize("with checkpoints added", size1, size0, size0 * 11 / 10); - fileStore.compact(); - fileStore.cleanup(); - long size2 = fileStore.getStats().getApproximateSize(); - assertSize("with checkpoints compacted", size2, size1 * 9/10, size1 * 11 / 10); - } finally { - fileStore.close(); - } - } - - /** - * Create 2 binary nodes with same content but not same reference. Verify - * de-duplication capabilities of compaction. - */ - @Test - public void offlineCompactionBinC1() throws Exception { - SegmentGCOptions gcOptions = defaultGCOptions().setOffline() - .withBinaryDeduplication(); - ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - FileStore fileStore = fileStoreBuilder(getFileStoreFolder()) - .withMaxFileSize(1) - .withGCOptions(gcOptions) - .withStatisticsProvider(new DefaultStatisticsProvider(executor)) - .build(); - SegmentNodeStore nodeStore = SegmentNodeStoreBuilders - .builder(fileStore).build(); - - try { - NodeBuilder extra = nodeStore.getRoot().builder(); - NodeBuilder content = extra.child("content"); - - int blobSize = 5 * 1024 * 1024; - byte[] data = new byte[blobSize]; - new Random().nextBytes(data); - - NodeBuilder c1 = content.child("c1"); - Blob b1 = nodeStore.createBlob(new ByteArrayInputStream(data)); - c1.setProperty("blob1", b1); - NodeBuilder c2 = content.child("c2"); - Blob b2 = nodeStore.createBlob(new ByteArrayInputStream(data)); - c2.setProperty("blob2", b2); - nodeStore.merge(extra, EmptyHook.INSTANCE, CommitInfo.EMPTY); - fileStore.flush(); - - int cpNo = 4; - Set cps = new HashSet(); - for (int i = 0; i < cpNo; i++) { - cps.add(nodeStore.checkpoint(60000)); - } - assertEquals(cpNo, cps.size()); - for (String cp : cps) { - assertTrue(nodeStore.retrieve(cp) != null); - } - - long size1 = fileStore.getStats().getApproximateSize(); - fileStore.compact(); - fileStore.cleanup(); - long size2 = fileStore.getStats().getApproximateSize(); - assertSize("with compacted binaries", size2, 0, size1 - blobSize); - } finally { - fileStore.close(); - } + private static Blob createBlob(NodeStore nodeStore, int size) throws IOException { + byte[] data = new byte[size]; + new Random().nextBytes(data); + return nodeStore.createBlob(new ByteArrayInputStream(data)); } - /** - * Create 2 binary nodes with same content but not same reference. Reduce - * the max size if de-duplicated binaries under the binary length. Verify - * de-duplication capabilities of compaction. - */ - @Test - public void offlineCompactionBinC2() throws Exception { - int blobSize = 5 * 1024 * 1024; - - SegmentGCOptions gcOptions = defaultGCOptions().setOffline() - .withBinaryDeduplication() - .setBinaryDeduplicationMaxSize(blobSize / 2); - ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - FileStore fileStore = fileStoreBuilder(getFileStoreFolder()) - .withMaxFileSize(1) - .withGCOptions(gcOptions) - .withStatisticsProvider(new DefaultStatisticsProvider(executor)) - .build(); - SegmentNodeStore nodeStore = SegmentNodeStoreBuilders - .builder(fileStore).build(); - - try { - NodeBuilder extra = nodeStore.getRoot().builder(); - NodeBuilder content = extra.child("content"); - - byte[] data = new byte[blobSize]; - new Random().nextBytes(data); - - NodeBuilder c1 = content.child("c1"); - Blob b1 = nodeStore.createBlob(new ByteArrayInputStream(data)); - c1.setProperty("blob1", b1); - NodeBuilder c2 = content.child("c2"); - Blob b2 = nodeStore.createBlob(new ByteArrayInputStream(data)); - c2.setProperty("blob2", b2); - nodeStore.merge(extra, EmptyHook.INSTANCE, CommitInfo.EMPTY); - fileStore.flush(); - - int cpNo = 4; - Set cps = new HashSet(); - for (int i = 0; i < cpNo; i++) { - cps.add(nodeStore.checkpoint(60000)); - } - assertEquals(cpNo, cps.size()); - for (String cp : cps) { - assertTrue(nodeStore.retrieve(cp) != null); - } - - long size1 = fileStore.getStats().getApproximateSize(); - fileStore.compact(); - fileStore.cleanup(); - long size2 = fileStore.getStats().getApproximateSize(); - - // not expected to reduce the size too much, as the binaries are - // above the threshold - assertSize("with compacted binaries", size2, size1 * 9 / 10, - size1 * 11 / 10); - } finally { - fileStore.close(); - } - } +// @Test +// public void testCancelCompaction() +// throws Throwable { +// final FileStore fileStore = fileStoreBuilder(getFileStoreFolder()) +// .withGCOptions(defaultGCOptions().setRetainedGenerations(2)) +// .withMaxFileSize(1) +// .build(); +// SegmentNodeStore nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build(); +// +// NodeBuilder builder = nodeStore.getRoot().builder(); +// addNodes(builder, 10); +// nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); +// fileStore.flush(); +// +// FutureTask async = runAsync(new Callable() { +// @Override +// public Boolean call() throws IOException { +// boolean cancelled = false; +// for (int k = 0; !cancelled && k < 1000; k++) { +// cancelled = !fileStore.compact(); +// } +// return cancelled; +// } +// }); +// +// // Give the compaction thread a head start +// sleepUninterruptibly(1, SECONDS); +// +// fileStore.close(); +// try { +// assertTrue(async.get()); +// } catch (ExecutionException e) { +// if (!(e.getCause() instanceof IllegalStateException)) { +// // Throw cause unless this is an ISE thrown by the +// // store being already closed, which is kinda expected +// throw e.getCause(); +// } +// } +// } +// +// private static void addNodes(NodeBuilder builder, int depth) { +// if (depth > 0) { +// NodeBuilder child1 = builder.setChildNode("1"); +// addNodes(child1, depth - 1); +// NodeBuilder child2 = builder.setChildNode("2"); +// addNodes(child2, depth - 1); +// } +// } +// +// /** +// * Regression test for OAK-2192 testing for mixed segments. This test does not +// * cover OAK-3348. I.e. it does not assert the segment graph is free of cross +// * gc generation references. +// */ +// @Test +// public void testMixedSegments() throws Exception { +// FileStore store = fileStoreBuilder(getFileStoreFolder()) +// .withMaxFileSize(2) +// .withMemoryMapping(true) +// .withGCOptions(defaultGCOptions().setForceAfterFail(true)) +// .build(); +// final SegmentNodeStore nodeStore = SegmentNodeStoreBuilders.builder(store).build(); +// final AtomicBoolean compactionSuccess = new AtomicBoolean(true); +// +// NodeBuilder root = nodeStore.getRoot().builder(); +// createNodes(root.setChildNode("test"), 10, 3); +// nodeStore.merge(root, EmptyHook.INSTANCE, CommitInfo.EMPTY); +// +// final Set beforeSegments = new HashSet(); +// collectSegments(store.getReader(), store.getRevisions(), beforeSegments); +// +// final AtomicReference run = new AtomicReference(true); +// final List failedCommits = newArrayList(); +// Thread[] threads = new Thread[10]; +// for (int k = 0; k < threads.length; k++) { +// final int threadId = k; +// threads[k] = new Thread(new Runnable() { +// @Override +// public void run() { +// for (int j = 0; run.get(); j++) { +// String nodeName = "b-" + threadId + "," + j; +// try { +// NodeBuilder root = nodeStore.getRoot().builder(); +// root.setChildNode(nodeName); +// nodeStore.merge(root, EmptyHook.INSTANCE, CommitInfo.EMPTY); +// Thread.sleep(5); +// } catch (CommitFailedException e) { +// failedCommits.add(nodeName); +// } catch (InterruptedException e) { +// Thread.interrupted(); +// break; +// } +// } +// } +// }); +// threads[k].start(); +// } +// store.compact(); +// run.set(false); +// for (Thread t : threads) { +// t.join(); +// } +// store.flush(); +// +// assumeTrue("Failed to acquire compaction lock", compactionSuccess.get()); +// assertTrue("Failed commits: " + failedCommits, failedCommits.isEmpty()); +// +// Set afterSegments = new HashSet(); +// collectSegments(store.getReader(), store.getRevisions(), afterSegments); +// try { +// for (UUID u : beforeSegments) { +// assertFalse("Mixed segments found: " + u, afterSegments.contains(u)); +// } +// } finally { +// store.close(); +// } +// } +// +// /** +// * Set a root node referring to a child node that lives in a different segments. Depending +// * on the order how the SegmentBufferWriters associated with the threads used to create the +// * nodes are flushed, this will introduce a forward reference between the segments. +// * The current cleanup mechanism cannot handle forward references and removes the referenced +// * segment causing a SNFE. +// * This is a regression introduced with OAK-1828. +// */ +// @Test +// public void cleanupCyclicGraph() throws Exception { +// FileStore fileStore = fileStoreBuilder(getFileStoreFolder()).build(); +// final SegmentWriter writer = fileStore.getWriter(); +// final SegmentNodeState oldHead = fileStore.getHead(); +// +// final SegmentNodeState child = run(new Callable() { +// @Override +// public SegmentNodeState call() throws Exception { +// NodeBuilder builder = EMPTY_NODE.builder(); +// return writer.writeNode(EMPTY_NODE); +// } +// }); +// SegmentNodeState newHead = run(new Callable() { +// @Override +// public SegmentNodeState call() throws Exception { +// NodeBuilder builder = oldHead.builder(); +// builder.setChildNode("child", child); +// return writer.writeNode(builder.getNodeState()); +// } +// }); +// +// writer.flush(); +// fileStore.getRevisions().setHead(oldHead.getRecordId(), newHead.getRecordId()); +// fileStore.close(); +// +// fileStore = fileStoreBuilder(getFileStoreFolder()).build(); +// +// traverse(fileStore.getHead()); +// fileStore.cleanup(); +// +// // Traversal after cleanup might result in an SNFE +// traverse(fileStore.getHead()); +// +// fileStore.close(); +// } +// +// private static void traverse(NodeState node) { +// for (ChildNodeEntry childNodeEntry : node.getChildNodeEntries()) { +// traverse(childNodeEntry.getNodeState()); +// } +// } +// +// private static T run(Callable callable) throws InterruptedException, ExecutionException { +// FutureTask task = new FutureTask(callable); +// new Thread(task).start(); +// return task.get(); +// } +// +// private static FutureTask runAsync(Callable callable) { +// FutureTask task = new FutureTask(callable); +// new Thread(task).start(); +// return task; +// } +// +// /** +// * Test asserting OAK-3348: Cross gc sessions might introduce references to pre-compacted segments +// */ +// @Test +// public void preCompactionReferences() throws Exception { +// for (String ref : new String[] {"merge-before-compact", "merge-after-compact"}) { +// File repoDir = new File(getFileStoreFolder(), ref); +// FileStore fileStore = fileStoreBuilder(repoDir) +// .withMaxFileSize(2) +// .withGCOptions(defaultGCOptions()) +// .build(); +// final SegmentNodeStore nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build(); +// try { +// // add some content +// NodeBuilder preGCBuilder = nodeStore.getRoot().builder(); +// preGCBuilder.setChildNode("test").setProperty("blob", createBlob(nodeStore, 1024 * 1024)); +// nodeStore.merge(preGCBuilder, EmptyHook.INSTANCE, CommitInfo.EMPTY); +// +// // remove it again so we have something to gc +// preGCBuilder = nodeStore.getRoot().builder(); +// preGCBuilder.getChildNode("test").remove(); +// nodeStore.merge(preGCBuilder, EmptyHook.INSTANCE, CommitInfo.EMPTY); +// +// // with a new builder simulate exceeding the update limit. +// // This will cause changes to be pre-written to segments +// preGCBuilder = nodeStore.getRoot().builder(); +// preGCBuilder.setChildNode("test").setChildNode("a").setChildNode("b").setProperty("foo", "bar"); +// for (int k = 0; k < getInteger("update.limit", 10000); k += 2) { +// preGCBuilder.setChildNode("dummy").remove(); +// } +// +// // case 1: merge above changes before compact +// if ("merge-before-compact".equals(ref)) { +// NodeBuilder builder = nodeStore.getRoot().builder(); +// builder.setChildNode("n"); +// nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); +// nodeStore.merge(preGCBuilder, EmptyHook.INSTANCE, CommitInfo.EMPTY); +// } +// +// // Ensure cleanup is efficient by surpassing the number of +// // retained generations +// for (int k = 0; k < defaultGCOptions().getRetainedGenerations(); k++) { +// fileStore.compact(); +// } +// +// // case 2: merge above changes after compact +// if ("merge-after-compact".equals(ref)) { +// NodeBuilder builder = nodeStore.getRoot().builder(); +// builder.setChildNode("n"); +// nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); +// nodeStore.merge(preGCBuilder, EmptyHook.INSTANCE, CommitInfo.EMPTY); +// } +// } finally { +// fileStore.close(); +// } +// +// // Re-initialise the file store to simulate off-line gc +// fileStore = fileStoreBuilder(repoDir).withMaxFileSize(2).build(); +// try { +// // The 1M blob should get gc-ed +// fileStore.cleanup(); +// assertTrue(ref + " repository size " + fileStore.getStats().getApproximateSize() + " < " + 1024 * 1024, +// fileStore.getStats().getApproximateSize() < 1024 * 1024); +// } finally { +// fileStore.close(); +// } +// } +// } +// +// private static void collectSegments(SegmentReader reader, Revisions revisions, +// final Set segmentIds) { +// new SegmentParser(reader) { +// @Override +// protected void onNode(RecordId parentId, RecordId nodeId) { +// super.onNode(parentId, nodeId); +// segmentIds.add(nodeId.asUUID()); +// } +// +// @Override +// protected void onTemplate(RecordId parentId, RecordId templateId) { +// super.onTemplate(parentId, templateId); +// segmentIds.add(templateId.asUUID()); +// } +// +// @Override +// protected void onMap(RecordId parentId, RecordId mapId, MapRecord map) { +// super.onMap(parentId, mapId, map); +// segmentIds.add(mapId.asUUID()); +// } +// +// @Override +// protected void onMapDiff(RecordId parentId, RecordId mapId, MapRecord map) { +// super.onMapDiff(parentId, mapId, map); +// segmentIds.add(mapId.asUUID()); +// } +// +// @Override +// protected void onMapLeaf(RecordId parentId, RecordId mapId, MapRecord map) { +// super.onMapLeaf(parentId, mapId, map); +// segmentIds.add(mapId.asUUID()); +// } +// +// @Override +// protected void onMapBranch(RecordId parentId, RecordId mapId, MapRecord map) { +// super.onMapBranch(parentId, mapId, map); +// segmentIds.add(mapId.asUUID()); +// } +// +// @Override +// protected void onProperty(RecordId parentId, RecordId propertyId, PropertyTemplate template) { +// super.onProperty(parentId, propertyId, template); +// segmentIds.add(propertyId.asUUID()); +// } +// +// @Override +// protected void onValue(RecordId parentId, RecordId valueId, Type type) { +// super.onValue(parentId, valueId, type); +// segmentIds.add(valueId.asUUID()); +// } +// +// @Override +// protected void onBlob(RecordId parentId, RecordId blobId) { +// super.onBlob(parentId, blobId); +// segmentIds.add(blobId.asUUID()); +// } +// +// @Override +// protected void onString(RecordId parentId, RecordId stringId) { +// super.onString(parentId, stringId); +// segmentIds.add(stringId.asUUID()); +// } +// +// @Override +// protected void onList(RecordId parentId, RecordId listId, int count) { +// super.onList(parentId, listId, count); +// segmentIds.add(listId.asUUID()); +// } +// +// @Override +// protected void onListBucket(RecordId parentId, RecordId listId, int index, int count, int capacity) { +// super.onListBucket(parentId, listId, index, count, capacity); +// segmentIds.add(listId.asUUID()); +// } +// }.parseNode(revisions.getHead()); +// } +// +// private static void createNodes(NodeBuilder builder, int count, int depth) { +// if (depth > 0) { +// for (int k = 0; k < count; k++) { +// NodeBuilder child = builder.setChildNode("node" + k); +// createProperties(child, count); +// createNodes(child, count, depth - 1); +// } +// } +// } +// +// private static void createProperties(NodeBuilder builder, int count) { +// for (int k = 0; k < count; k++) { +// builder.setProperty("property-" + UUID.randomUUID().toString(), "value-" + UUID.randomUUID().toString()); +// } +// } +// +// @Test +// public void propertyRetention() throws Exception { +// SegmentGCOptions gcOptions = defaultGCOptions(); +// FileStore fileStore = fileStoreBuilder(getFileStoreFolder()) +// .withMaxFileSize(1) +// .withGCOptions(gcOptions) +// .build(); +// try { +// final SegmentNodeStore nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build(); +// +// // Add a property +// NodeBuilder builder = nodeStore.getRoot().builder(); +// builder.setChildNode("test").setProperty("property", "value"); +// nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); +// +// // Segment id of the current segment +// NodeState test = nodeStore.getRoot().getChildNode("test"); +// SegmentId id = ((SegmentNodeState) test).getRecordId().getSegmentId(); +// fileStore.flush(); +// assertTrue(fileStore.containsSegment(id)); +// +// // Add enough content to fill up the current tar file +// builder = nodeStore.getRoot().builder(); +// addContent(builder.setChildNode("dump")); +// nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); +// +// // Segment and property still there +// assertTrue(fileStore.containsSegment(id)); +// PropertyState property = test.getProperty("property"); +// assertEquals("value", property.getValue(STRING)); +// +// // GC should remove the segment +// fileStore.flush(); +// // Ensure cleanup is efficient by surpassing the number of +// // retained generations +// for (int k = 0; k < gcOptions.getRetainedGenerations(); k++) { +// fileStore.compact(); +// } +// fileStore.cleanup(); +// +// try { +// fileStore.readSegment(id); +// fail("Segment " + id + " should be gc'ed"); +// } catch (SegmentNotFoundException ignore) {} +// } finally { +// fileStore.close(); +// } +// } +// +// @Test +// public void checkpointDeduplicationTest() throws Exception { +// FileStore fileStore = fileStoreBuilder(getFileStoreFolder()).build(); +// try { +// SegmentNodeStore nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build(); +// NodeBuilder builder = nodeStore.getRoot().builder(); +// builder.setChildNode("a").setChildNode("aa"); +// builder.setChildNode("b").setChildNode("bb"); +// builder.setChildNode("c").setChildNode("cc"); +// nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); +// +// String cpId = nodeStore.checkpoint(Long.MAX_VALUE); +// +// NodeState uncompacted = nodeStore.getRoot(); +// fileStore.compact(); +// NodeState compacted = nodeStore.getRoot(); +// +// assertEquals(uncompacted, compacted); +// assertTrue(uncompacted instanceof SegmentNodeState); +// assertTrue(compacted instanceof SegmentNodeState); +// assertEquals(((SegmentNodeState)uncompacted).getStableId(), ((SegmentNodeState)compacted).getStableId()); +// +// NodeState checkpoint = nodeStore.retrieve(cpId); +// assertTrue(checkpoint instanceof SegmentNodeState); +// assertEquals("Checkpoint should get de-duplicated", +// ((Record) compacted).getRecordId(), ((Record) checkpoint).getRecordId()); +// } finally { +// fileStore.close(); +// } +// } /** - * Create 2 binary nodes with same content and same reference. Verify - * de-duplication capabilities of compaction + * Test asserting OAK-4106 : Concurrent writes during cleanup should not + * impact reclaimedSize computation. + * Also asserting OAK-4669: No new generation of tar should be created when the content is the same + * and when there are various indices created. */ @Test - public void offlineCompactionBinR1() throws Exception { - SegmentGCOptions gcOptions = defaultGCOptions().setOffline(); + public void concurrentWritesDuringCleanup() throws Exception { ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - FileStore fileStore = fileStoreBuilder(getFileStoreFolder()) - .withMaxFileSize(1) - .withGCOptions(gcOptions) - .withStatisticsProvider(new DefaultStatisticsProvider(executor)) - .build(); - SegmentNodeStore nodeStore = SegmentNodeStoreBuilders - .builder(fileStore).build(); - - try { - NodeBuilder extra = nodeStore.getRoot().builder(); - NodeBuilder content = extra.child("content"); - - int blobSize = 5 * 1024 * 1024; - byte[] data = new byte[blobSize]; - new Random().nextBytes(data); - Blob b = nodeStore.createBlob(new ByteArrayInputStream(data)); - - NodeBuilder c1 = content.child("c1"); - c1.setProperty("blob1", b); - NodeBuilder c2 = content.child("c2"); - c2.setProperty("blob2", b); - nodeStore.merge(extra, EmptyHook.INSTANCE, CommitInfo.EMPTY); - fileStore.flush(); - - int cpNo = 4; - Set cps = new HashSet(); - for (int i = 0; i < cpNo; i++) { - cps.add(nodeStore.checkpoint(60000)); - } - assertEquals(cpNo, cps.size()); - for (String cp : cps) { - assertTrue(nodeStore.retrieve(cp) != null); - } - - // 5Mb, de-duplication by the SegmentWriter - long size1 = fileStore.getStats().getApproximateSize(); - fileStore.compact(); - fileStore.cleanup(); - long size2 = fileStore.getStats().getApproximateSize(); - assertSize("with compacted binaries", size2, 0, size1 * 11 / 10); - } finally { - fileStore.close(); + StatisticsProvider statsProvider = new DefaultStatisticsProvider(executor); + final FileStoreGCMonitor fileStoreGCMonitor = new FileStoreGCMonitor(Clock.SIMPLE); + + File repoPath = new File("/Users/dulceanu/work/test-repo"); + for (File f : repoPath.listFiles()) { + f.delete(); } - } - - private static void assertSize(String info, long size, long lower, long upper) { - log.debug("File Store {} size {}, expected in interval [{},{}]", - info, size, lower, upper); - assertTrue("File Store " + info + " size expected in interval " + - "[" + (lower) + "," + (upper) + "] but was: " + (size), - size >= lower && size <= (upper)); - } - - private static Blob createBlob(NodeStore nodeStore, int size) throws IOException { - byte[] data = new byte[size]; - new Random().nextBytes(data); - return nodeStore.createBlob(new ByteArrayInputStream(data)); - } - - @Test - public void testCancelCompaction() - throws Throwable { - final FileStore fileStore = fileStoreBuilder(getFileStoreFolder()) + + final FileStore fileStore = fileStoreBuilder(repoPath) .withGCOptions(defaultGCOptions().setRetainedGenerations(2)) + .withGCMonitor(fileStoreGCMonitor) + .withStatisticsProvider(statsProvider) .withMaxFileSize(1) .build(); - SegmentNodeStore nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build(); - - NodeBuilder builder = nodeStore.getRoot().builder(); - addNodes(builder, 10); - nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); - fileStore.flush(); - - FutureTask async = runAsync(new Callable() { - @Override - public Boolean call() throws IOException { - boolean cancelled = false; - for (int k = 0; !cancelled && k < 1000; k++) { - cancelled = !fileStore.compact(); - } - return cancelled; - } - }); - - // Give the compaction thread a head start - sleepUninterruptibly(1, SECONDS); + + final SegmentNodeStore nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build(); - fileStore.close(); try { - assertTrue(async.get()); - } catch (ExecutionException e) { - if (!(e.getCause() instanceof IllegalStateException)) { - // Throw cause unless this is an ISE thrown by the - // store being already closed, which is kinda expected - throw e.getCause(); - } - } - } - - private static void addNodes(NodeBuilder builder, int depth) { - if (depth > 0) { - NodeBuilder child1 = builder.setChildNode("1"); - addNodes(child1, depth - 1); - NodeBuilder child2 = builder.setChildNode("2"); - addNodes(child2, depth - 1); - } - } - - /** - * Regression test for OAK-2192 testing for mixed segments. This test does not - * cover OAK-3348. I.e. it does not assert the segment graph is free of cross - * gc generation references. - */ - @Test - public void testMixedSegments() throws Exception { - FileStore store = fileStoreBuilder(getFileStoreFolder()) - .withMaxFileSize(2) - .withMemoryMapping(true) - .withGCOptions(defaultGCOptions().setForceAfterFail(true)) - .build(); - final SegmentNodeStore nodeStore = SegmentNodeStoreBuilders.builder(store).build(); - final AtomicBoolean compactionSuccess = new AtomicBoolean(true); - - NodeBuilder root = nodeStore.getRoot().builder(); - createNodes(root.setChildNode("test"), 10, 3); - nodeStore.merge(root, EmptyHook.INSTANCE, CommitInfo.EMPTY); - - final Set beforeSegments = new HashSet(); - collectSegments(store.getReader(), store.getRevisions(), beforeSegments); - - final AtomicReference run = new AtomicReference(true); - final List failedCommits = newArrayList(); - Thread[] threads = new Thread[10]; - for (int k = 0; k < threads.length; k++) { - final int threadId = k; - threads[k] = new Thread(new Runnable() { - @Override + Runnable concurrentWriteTask = new Runnable() { public void run() { - for (int j = 0; run.get(); j++) { - String nodeName = "b-" + threadId + "," + j; - try { - NodeBuilder root = nodeStore.getRoot().builder(); - root.setChildNode(nodeName); - nodeStore.merge(root, EmptyHook.INSTANCE, CommitInfo.EMPTY); - Thread.sleep(5); - } catch (CommitFailedException e) { - failedCommits.add(nodeName); - } catch (InterruptedException e) { - Thread.interrupted(); - break; - } + try { + NodeBuilder builder = nodeStore.getRoot().builder(); + builder.setProperty("blob" + new Random().nextInt(), createBlob(nodeStore, 25 * 25)); + + nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); + fileStore.flush(); + } catch (CommitFailedException e) { + // ignore + } catch (IOException e) { + // ignore } - } - }); - threads[k].start(); - } - store.compact(); - run.set(false); - for (Thread t : threads) { - t.join(); - } - store.flush(); - - assumeTrue("Failed to acquire compaction lock", compactionSuccess.get()); - assertTrue("Failed commits: " + failedCommits, failedCommits.isEmpty()); - - Set afterSegments = new HashSet(); - collectSegments(store.getReader(), store.getRevisions(), afterSegments); - try { - for (UUID u : beforeSegments) { - assertFalse("Mixed segments found: " + u, afterSegments.contains(u)); - } - } finally { - store.close(); - } - } - - /** - * Set a root node referring to a child node that lives in a different segments. Depending - * on the order how the SegmentBufferWriters associated with the threads used to create the - * nodes are flushed, this will introduce a forward reference between the segments. - * The current cleanup mechanism cannot handle forward references and removes the referenced - * segment causing a SNFE. - * This is a regression introduced with OAK-1828. - */ - @Test - public void cleanupCyclicGraph() throws Exception { - FileStore fileStore = fileStoreBuilder(getFileStoreFolder()).build(); - final SegmentWriter writer = fileStore.getWriter(); - final SegmentNodeState oldHead = fileStore.getHead(); - - final SegmentNodeState child = run(new Callable() { - @Override - public SegmentNodeState call() throws Exception { - NodeBuilder builder = EMPTY_NODE.builder(); - return writer.writeNode(EMPTY_NODE); - } - }); - SegmentNodeState newHead = run(new Callable() { - @Override - public SegmentNodeState call() throws Exception { - NodeBuilder builder = oldHead.builder(); - builder.setChildNode("child", child); - return writer.writeNode(builder.getNodeState()); + }; + }; + + ExecutorService executorService = Executors.newFixedThreadPool(100); + for (int i = 0; i < 100; i++) { + executorService.execute(concurrentWriteTask); } - }); - writer.flush(); - fileStore.getRevisions().setHead(oldHead.getRecordId(), newHead.getRecordId()); - fileStore.close(); - - fileStore = fileStoreBuilder(getFileStoreFolder()).build(); - - traverse(fileStore.getHead()); - fileStore.cleanup(); - - // Traversal after cleanup might result in an SNFE - traverse(fileStore.getHead()); - - fileStore.close(); - } - - private static void traverse(NodeState node) { - for (ChildNodeEntry childNodeEntry : node.getChildNodeEntries()) { - traverse(childNodeEntry.getNodeState()); - } - } - - private static T run(Callable callable) throws InterruptedException, ExecutionException { - FutureTask task = new FutureTask(callable); - new Thread(task).start(); - return task.get(); - } - - private static FutureTask runAsync(Callable callable) { - FutureTask task = new FutureTask(callable); - new Thread(task).start(); - return task; - } - - /** - * Test asserting OAK-3348: Cross gc sessions might introduce references to pre-compacted segments - */ - @Test - public void preCompactionReferences() throws Exception { - for (String ref : new String[] {"merge-before-compact", "merge-after-compact"}) { - File repoDir = new File(getFileStoreFolder(), ref); - FileStore fileStore = fileStoreBuilder(repoDir) - .withMaxFileSize(2) - .withGCOptions(defaultGCOptions()) - .build(); - final SegmentNodeStore nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build(); - try { - // add some content - NodeBuilder preGCBuilder = nodeStore.getRoot().builder(); - preGCBuilder.setChildNode("test").setProperty("blob", createBlob(nodeStore, 1024 * 1024)); - nodeStore.merge(preGCBuilder, EmptyHook.INSTANCE, CommitInfo.EMPTY); - - // remove it again so we have something to gc - preGCBuilder = nodeStore.getRoot().builder(); - preGCBuilder.getChildNode("test").remove(); - nodeStore.merge(preGCBuilder, EmptyHook.INSTANCE, CommitInfo.EMPTY); - - // with a new builder simulate exceeding the update limit. - // This will cause changes to be pre-written to segments - preGCBuilder = nodeStore.getRoot().builder(); - preGCBuilder.setChildNode("test").setChildNode("a").setChildNode("b").setProperty("foo", "bar"); - for (int k = 0; k < getInteger("update.limit", 10000); k += 2) { - preGCBuilder.setChildNode("dummy").remove(); - } - - // case 1: merge above changes before compact - if ("merge-before-compact".equals(ref)) { - NodeBuilder builder = nodeStore.getRoot().builder(); - builder.setChildNode("n"); - nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); - nodeStore.merge(preGCBuilder, EmptyHook.INSTANCE, CommitInfo.EMPTY); - } - - // Ensure cleanup is efficient by surpassing the number of - // retained generations - for (int k = 0; k < defaultGCOptions().getRetainedGenerations(); k++) { - fileStore.compact(); - } - - // case 2: merge above changes after compact - if ("merge-after-compact".equals(ref)) { - NodeBuilder builder = nodeStore.getRoot().builder(); - builder.setChildNode("n"); - nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); - nodeStore.merge(preGCBuilder, EmptyHook.INSTANCE, CommitInfo.EMPTY); - } - } finally { - fileStore.close(); - } - - // Re-initialise the file store to simulate off-line gc - fileStore = fileStoreBuilder(repoDir).withMaxFileSize(2).build(); - try { - // The 1M blob should get gc-ed - fileStore.cleanup(); - assertTrue(ref + " repository size " + fileStore.getStats().getApproximateSize() + " < " + 1024 * 1024, - fileStore.getStats().getApproximateSize() < 1024 * 1024); - } finally { - fileStore.close(); - } - } - } - - private static void collectSegments(SegmentReader reader, Revisions revisions, - final Set segmentIds) { - new SegmentParser(reader) { - @Override - protected void onNode(RecordId parentId, RecordId nodeId) { - super.onNode(parentId, nodeId); - segmentIds.add(nodeId.asUUID()); - } - - @Override - protected void onTemplate(RecordId parentId, RecordId templateId) { - super.onTemplate(parentId, templateId); - segmentIds.add(templateId.asUUID()); - } - - @Override - protected void onMap(RecordId parentId, RecordId mapId, MapRecord map) { - super.onMap(parentId, mapId, map); - segmentIds.add(mapId.asUUID()); - } - - @Override - protected void onMapDiff(RecordId parentId, RecordId mapId, MapRecord map) { - super.onMapDiff(parentId, mapId, map); - segmentIds.add(mapId.asUUID()); - } - - @Override - protected void onMapLeaf(RecordId parentId, RecordId mapId, MapRecord map) { - super.onMapLeaf(parentId, mapId, map); - segmentIds.add(mapId.asUUID()); - } - - @Override - protected void onMapBranch(RecordId parentId, RecordId mapId, MapRecord map) { - super.onMapBranch(parentId, mapId, map); - segmentIds.add(mapId.asUUID()); - } - - @Override - protected void onProperty(RecordId parentId, RecordId propertyId, PropertyTemplate template) { - super.onProperty(parentId, propertyId, template); - segmentIds.add(propertyId.asUUID()); - } - - @Override - protected void onValue(RecordId parentId, RecordId valueId, Type type) { - super.onValue(parentId, valueId, type); - segmentIds.add(valueId.asUUID()); - } - - @Override - protected void onBlob(RecordId parentId, RecordId blobId) { - super.onBlob(parentId, blobId); - segmentIds.add(blobId.asUUID()); - } - - @Override - protected void onString(RecordId parentId, RecordId stringId) { - super.onString(parentId, stringId); - segmentIds.add(stringId.asUUID()); - } - - @Override - protected void onList(RecordId parentId, RecordId listId, int count) { - super.onList(parentId, listId, count); - segmentIds.add(listId.asUUID()); - } - - @Override - protected void onListBucket(RecordId parentId, RecordId listId, int index, int count, int capacity) { - super.onListBucket(parentId, listId, index, count, capacity); - segmentIds.add(listId.asUUID()); - } - }.parseNode(revisions.getHead()); - } - - private static void createNodes(NodeBuilder builder, int count, int depth) { - if (depth > 0) { - for (int k = 0; k < count; k++) { - NodeBuilder child = builder.setChildNode("node" + k); - createProperties(child, count); - createNodes(child, count, depth - 1); - } - } - } - - private static void createProperties(NodeBuilder builder, int count) { - for (int k = 0; k < count; k++) { - builder.setProperty("property-" + UUID.randomUUID().toString(), "value-" + UUID.randomUUID().toString()); - } - } - - @Test - public void propertyRetention() throws Exception { - SegmentGCOptions gcOptions = defaultGCOptions(); - FileStore fileStore = fileStoreBuilder(getFileStoreFolder()) - .withMaxFileSize(1) - .withGCOptions(gcOptions) - .build(); - try { - final SegmentNodeStore nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build(); - - // Add a property - NodeBuilder builder = nodeStore.getRoot().builder(); - builder.setChildNode("test").setProperty("property", "value"); - nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); - - // Segment id of the current segment - NodeState test = nodeStore.getRoot().getChildNode("test"); - SegmentId id = ((SegmentNodeState) test).getRecordId().getSegmentId(); - fileStore.flush(); - assertTrue(fileStore.containsSegment(id)); - - // Add enough content to fill up the current tar file - builder = nodeStore.getRoot().builder(); - addContent(builder.setChildNode("dump")); - nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); - - // Segment and property still there - assertTrue(fileStore.containsSegment(id)); - PropertyState property = test.getProperty("property"); - assertEquals("value", property.getValue(STRING)); - - // GC should remove the segment - fileStore.flush(); - // Ensure cleanup is efficient by surpassing the number of - // retained generations - for (int k = 0; k < gcOptions.getRetainedGenerations(); k++) { - fileStore.compact(); - } + Thread.sleep(1 * 200); fileStore.cleanup(); - - try { - fileStore.readSegment(id); - fail("Segment " + id + " should be gc'ed"); - } catch (SegmentNotFoundException ignore) {} - } finally { - fileStore.close(); - } - } - - @Test - public void checkpointDeduplicationTest() throws Exception { - FileStore fileStore = fileStoreBuilder(getFileStoreFolder()).build(); - try { - SegmentNodeStore nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build(); - NodeBuilder builder = nodeStore.getRoot().builder(); - builder.setChildNode("a").setChildNode("aa"); - builder.setChildNode("b").setChildNode("bb"); - builder.setChildNode("c").setChildNode("cc"); - nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); - - String cpId = nodeStore.checkpoint(Long.MAX_VALUE); - - NodeState uncompacted = nodeStore.getRoot(); - fileStore.compact(); - NodeState compacted = nodeStore.getRoot(); - - assertEquals(uncompacted, compacted); - assertTrue(uncompacted instanceof SegmentNodeState); - assertTrue(compacted instanceof SegmentNodeState); - assertEquals(((SegmentNodeState)uncompacted).getStableId(), ((SegmentNodeState)compacted).getStableId()); - - NodeState checkpoint = nodeStore.retrieve(cpId); - assertTrue(checkpoint instanceof SegmentNodeState); - assertEquals("Checkpoint should get de-duplicated", - ((Record) compacted).getRecordId(), ((Record) checkpoint).getRecordId()); +// assertTrue(fileStoreGCMonitor.getLastReclaimedSize() >= 0); + System.out.println(fileStoreGCMonitor.getLastReclaimedSize()); + + new ExecutorCloser(executorService).close(); } finally { fileStore.close(); } } - + private static void addContent(NodeBuilder builder) { for (int k = 0; k < 10000; k++) { builder.setProperty(UUID.randomUUID().toString(), UUID.randomUUID().toString());