Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStore.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStore.java (revision ) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStore.java (revision ) @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.blob; + +import java.io.InputStream; +import java.util.List; + +import org.apache.jackrabbit.core.data.DataRecord; +import org.apache.jackrabbit.core.data.DataStoreException; + +/** + * Interface to be implemented by a shared data store. + */ +public interface SharedDataStore { + /** + * Explicitly identifies the type of the data store + */ + enum Type { + SHARED, DEFAULT + } + + /** + * Adds the root record. + * + * @param stream the stream + * @param name the name of the root record + * @return the data record + * @throws DataStoreException the data store exception + */ + void addMetadataRecord(InputStream stream, String name) + throws DataStoreException; + + DataRecord getMetadataRecord(String name); + + /** + * Gets the all root records. + * + * @return the all root records + */ + List getAllMetadataRecords(String prefix); + + /** + * Deletes the root record represented by the given parameters. + * + * @param name the name of the root record + * @return success/failure + */ + boolean deleteMetadataRecord(String name); + + /** + * Deletes all records matching the given prefix. + * + * @param prefix metadata type identifier + */ + void deleteAllMetadataRecords(String prefix); + + /** + * Gets the type. + * + * @return the type + */ + Type getType(); +} + Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java (date 1423545306000) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java (revision ) @@ -18,12 +18,16 @@ import java.io.Closeable; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; +import java.io.InputStream; import java.util.Comparator; +import java.util.List; import com.google.common.io.Files; import org.apache.commons.io.FileUtils; +import org.apache.jackrabbit.oak.commons.IOUtils; import org.apache.jackrabbit.oak.commons.sort.ExternalSort; /** @@ -47,6 +51,13 @@ /** The garbage stores the garbage collection candidates which were not deleted . */ private final File garbage; + private final static Comparator lexComparator = + new Comparator() { + @Override + public int compare(String s1, String s2) { + return s1.compareTo(s2); + } + }; /** * Instantiates a new garbage collector file state. @@ -118,21 +129,26 @@ * * @param file file whose contents needs to be sorted */ - public void sort(File file) throws IOException { + public static void sort(File file) throws IOException { File sorted = createTempFile(); - Comparator lexComparator = new Comparator() { - @Override - public int compare(String s1, String s2) { - return s1.compareTo(s2); + merge(ExternalSort.sortInBatch(file, lexComparator, true), sorted); + Files.move(sorted, file); - } + } - }; + + public static void merge(List files, File output) throws IOException { ExternalSort.mergeSortedFiles( - ExternalSort.sortInBatch(file, lexComparator, true), - sorted, lexComparator, true); - Files.move(sorted, file); + files, + output, lexComparator, true); } - + - private File createTempFile() throws IOException { - return File.createTempFile("temp", null, home); + public static File copy(InputStream stream) throws IOException { + File file = createTempFile(); + IOUtils.copy(stream, + new FileOutputStream(file)); + return file; + } + + private static File createTempFile() throws IOException { + return File.createTempFile("temp", null); } -} \ No newline at end of file +} Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java (date 1423545306000) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java (revision ) @@ -18,6 +18,7 @@ */ package org.apache.jackrabbit.oak.plugins.segment; +import static org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType.REPOSITORY; import static org.junit.Assert.assertTrue; import java.io.ByteArrayInputStream; @@ -36,10 +37,15 @@ import com.google.common.util.concurrent.MoreExecutors; import org.apache.commons.io.FileUtils; +import org.apache.jackrabbit.core.data.FileDataStore; +import org.apache.jackrabbit.oak.api.Blob; import org.apache.jackrabbit.oak.api.CommitFailedException; import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector; +import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore; import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore; +import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils; import org.apache.jackrabbit.oak.plugins.document.blob.ds.DataStoreUtils; +import org.apache.jackrabbit.oak.plugins.identifier.ClusterRepositoryInfo; import org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy; import org.apache.jackrabbit.oak.plugins.segment.file.FileStore; import org.apache.jackrabbit.oak.spi.blob.BlobStore; @@ -64,8 +70,8 @@ if (nodeStore == null) { store = new FileStore(blobStore, getWorkDir(), 256, false); CompactionStrategy compactionStrategy = - new CompactionStrategy(false, true, - CompactionStrategy.CleanupType.CLEAN_OLD, 0, CompactionStrategy.MEMORY_THRESHOLD_DEFAULT) { + new CompactionStrategy(false, CompactionStrategy.CLONE_BINARIES_DEFAULT, + CompactionStrategy.CleanupType.CLEAN_ALL, 0, CompactionStrategy.MEMORY_THRESHOLD_DEFAULT) { @Override public boolean compacted(@Nonnull Callable setHead) throws Exception { return setHead.call(); @@ -132,16 +138,23 @@ @Test public void gc() throws Exception { HashSet remaining = setUp(); + String repoId = null; + if (SharedDataStoreUtils.isShared(store.getBlobStore())) { + repoId = ClusterRepositoryInfo.createId(nodeStore); + ((SharedDataStore) store.getBlobStore()).addMetadataRecord( + new ByteArrayInputStream(new byte[0]), + REPOSITORY.getNameFromId(repoId)); + } MarkSweepGarbageCollector gc = new MarkSweepGarbageCollector( new SegmentBlobReferenceRetriever(store.getTracker()), (GarbageCollectableBlobStore) store.getBlobStore(), MoreExecutors.sameThreadExecutor(), - "./target", 2048, true, 0); - gc.collectGarbage(); + "./target", 2048, 0, repoId); + gc.collectGarbage(false); Set existingAfterGC = iterate(); - assertTrue(Sets.symmetricDifference(remaining, existingAfterGC).isEmpty()); + assertTrue(Sets.difference(remaining, existingAfterGC).isEmpty()); } protected Set iterate() throws Exception { Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java (date 1423545306000) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java (revision ) @@ -24,6 +24,7 @@ import static org.apache.jackrabbit.oak.commons.PropertiesUtil.toLong; import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.registerMBean; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.util.ArrayList; import java.util.Dictionary; @@ -55,8 +56,11 @@ import org.apache.jackrabbit.oak.plugins.blob.BlobGC; import org.apache.jackrabbit.oak.plugins.blob.BlobGCMBean; import org.apache.jackrabbit.oak.plugins.blob.BlobGarbageCollector; +import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore; +import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils; import org.apache.jackrabbit.oak.plugins.document.cache.CachingDocumentStore; import org.apache.jackrabbit.oak.plugins.document.util.MongoConnection; +import org.apache.jackrabbit.oak.plugins.identifier.ClusterRepositoryInfo; import org.apache.jackrabbit.oak.spi.blob.BlobStore; import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore; import org.apache.jackrabbit.oak.spi.state.NodeStore; @@ -291,6 +295,17 @@ mkBuilder.setExecutor(executor); mk = mkBuilder.open(); + // If a shared data store register the repo id in the data store + if (SharedDataStoreUtils.isShared(blobStore)) { + try { + String repoId = ClusterRepositoryInfo.createId(mk.getNodeStore()); + ((SharedDataStore) blobStore).addMetadataRecord(new ByteArrayInputStream(new byte[0]), + SharedDataStoreUtils.SharedStoreRecordType.REPOSITORY.getNameFromId(repoId)); + } catch (Exception e) { + throw new IOException("Could not register a unique repositoryId", e); + } + } + registerJMXBeans(mk.getNodeStore()); registerLastRevRecoveryJob(mk.getNodeStore()); @@ -443,11 +458,14 @@ if (store.getBlobStore() instanceof GarbageCollectableBlobStore) { BlobGarbageCollector gc = new BlobGarbageCollector() { @Override - public void collectGarbage() throws Exception { - store.createBlobGarbageCollector(blobGcMaxAgeInSecs).collectGarbage(); + public void collectGarbage(boolean sweep) throws Exception { + store.createBlobGarbageCollector(blobGcMaxAgeInSecs, + ClusterRepositoryInfo.getId(mk.getNodeStore())) + .collectGarbage(sweep); } }; - registrations.add(registerMBean(whiteboard, BlobGCMBean.class, new BlobGC(gc, executor), + registrations.add(registerMBean(whiteboard, BlobGCMBean.class, + new BlobGC(gc, executor), BlobGCMBean.TYPE, "Document node store blob garbage collection")); } Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/ClusterRepositoryInfoTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/ClusterRepositoryInfoTest.java (revision ) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/ClusterRepositoryInfoTest.java (revision ) @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.blob; + +import static org.hamcrest.CoreMatchers.instanceOf; +import junit.framework.Assert; + +import org.apache.jackrabbit.oak.plugins.document.DocumentMK; +import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore; +import org.apache.jackrabbit.oak.plugins.document.blob.ds.DataStoreUtils; +import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore; +import org.apache.jackrabbit.oak.plugins.identifier.ClusterRepositoryInfo; +import org.apache.jackrabbit.oak.spi.blob.BlobStore; +import org.junit.Assume; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Tests the ClusterRepositoryInfo unique cluster repository id. + */ +public class ClusterRepositoryInfoTest { + static BlobStore blobStore; + + @BeforeClass + public static void setup() { + try { + blobStore = DataStoreUtils.getBlobStore(); + Assume.assumeThat(blobStore, instanceOf(SharedDataStore.class)); + } catch (Exception e) { + Assume.assumeNoException(e); + } + } + + @Test + public void differentCluster() throws Exception { + DocumentNodeStore ds1 = new DocumentMK.Builder() + .setAsyncDelay(0) + .setDocumentStore(new MemoryDocumentStore()) + .setBlobStore(blobStore) + .getNodeStore(); + String repoId1 = ClusterRepositoryInfo.createId(ds1); + + DocumentNodeStore ds2 = new DocumentMK.Builder() + .setAsyncDelay(0) + .setDocumentStore(new MemoryDocumentStore()) + .setBlobStore(blobStore) + .getNodeStore(); + String repoId2 = ClusterRepositoryInfo.createId(ds2); + + Assert.assertNotSame(repoId1, repoId2); + } + + @Test + public void sameCluster() throws Exception { + MemoryDocumentStore store = new MemoryDocumentStore(); + DocumentNodeStore ds1 = new DocumentMK.Builder() + .setAsyncDelay(0) + .setDocumentStore(store) + .setClusterId(1) + .setBlobStore(blobStore) + .getNodeStore(); + String repoId1 = ClusterRepositoryInfo.createId(ds1); + ds1.runBackgroundOperations(); + + DocumentNodeStore ds2 = new DocumentMK.Builder() + .setAsyncDelay(0) + .setDocumentStore(store) + .setClusterId(2) + .setBlobStore(blobStore) + .getNodeStore(); + String repoId2 = ClusterRepositoryInfo.createId(ds2); + + // Since the same cluster the ids should be equal + Assert.assertEquals(repoId1, repoId2); + } +} + Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedDataStoreUtils.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedDataStoreUtils.java (revision ) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedDataStoreUtils.java (revision ) @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.blob.datastore; + +import java.util.List; +import java.util.Set; + +import javax.annotation.Nullable; + +import com.google.common.base.Function; +import com.google.common.base.Joiner; +import com.google.common.base.Splitter; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.Ordering; +import com.google.common.collect.Sets; + +import org.apache.jackrabbit.core.data.DataRecord; +import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore; +import org.apache.jackrabbit.oak.spi.blob.BlobStore; + +/** + * Utility class for {@link SharedDataStore}. + */ +public class SharedDataStoreUtils { + /** + * Checks if the blob store shared. + * + * @param blobStore the blob store + * @return true if shared + */ + public static boolean isShared(BlobStore blobStore) { + return (blobStore instanceof SharedDataStore) + && (((SharedDataStore) blobStore).getType() == SharedDataStore.Type.SHARED); + } + + /** + * Gets the earliest record of the available reference records. + * + * @param recs the recs + * @return the earliest record + */ + public static DataRecord getEarliestRecord(List recs) { + return Ordering.natural().onResultOf( + new Function() { + @Override + @Nullable + public Long apply(@Nullable DataRecord input) { + return input.getLastModified(); + } + }).min(recs); + } + + /** + * Repositories from which marked references not available. + * + * @param repos the repos + * @param refs the refs + * @return the sets the sets whose references not available + */ + public static Set refsNotAvailableFromRepos(List repos, + List refs) { + return Sets.difference(FluentIterable.from(repos).uniqueIndex( + new Function() { + @Override + @Nullable + public String apply(@Nullable DataRecord input) { + return SharedStoreRecordType.REPOSITORY.getIdFromName(input.getIdentifier().toString()); + } + }).keySet(), + FluentIterable.from(refs).uniqueIndex( + new Function() { + @Override + @Nullable + public String apply(@Nullable DataRecord input) { + return SharedStoreRecordType.REFERENCES.getIdFromName(input.getIdentifier().toString()); + } + }).keySet()); + } + + /** + * Encapsulates the different type of records at the data store root. + */ + public enum SharedStoreRecordType { + REFERENCES("references"), REPOSITORY("repository"); + + private final String type; + + SharedStoreRecordType(String type) { + this.type = type; + } + + public String getType() { + return type; + } + + public String getIdFromName(String name) { + return Splitter.on(DELIIM).limit(2).splitToList(name).get(1); + } + + public String getNameFromId(String id) { + return Joiner.on(DELIIM).join(getType(), id); + } + + static final String DELIIM = "-"; + } +} + Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStoreUtilsTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStoreUtilsTest.java (revision ) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStoreUtilsTest.java (revision ) @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.plugins.blob; + +import static org.hamcrest.CoreMatchers.instanceOf; + +import java.io.ByteArrayInputStream; +import java.util.UUID; + +import junit.framework.Assert; + +import org.apache.jackrabbit.core.data.DataRecord; +import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils; +import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType; +import org.apache.jackrabbit.oak.plugins.document.blob.ds.DataStoreUtils; +import org.junit.Assume; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Test for SharedDataUtils to test addition, retrieval and deletion of root records. + */ +public class SharedDataStoreUtilsTest { + SharedDataStore dataStore; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + try { + Assume.assumeThat(DataStoreUtils.getBlobStore(), instanceOf(SharedDataStore.class)); + } catch (Exception e) { + Assume.assumeNoException(e); + } + } + + @Test + public void test() throws Exception { + dataStore = DataStoreUtils.getBlobStore(); + String repoId1 = UUID.randomUUID().toString(); + String repoId2 = UUID.randomUUID().toString(); + + // Add repository records + dataStore.addMetadataRecord(new ByteArrayInputStream(new byte[0]), + SharedStoreRecordType.REPOSITORY.getNameFromId(repoId1)); + DataRecord repo1 = dataStore.getMetadataRecord(SharedStoreRecordType.REPOSITORY.getNameFromId(repoId1)); + dataStore.addMetadataRecord(new ByteArrayInputStream(new byte[0]), + SharedStoreRecordType.REPOSITORY.getNameFromId(repoId2)); + DataRecord repo2 = dataStore.getMetadataRecord(SharedStoreRecordType.REPOSITORY.getNameFromId(repoId2)); + + // Add reference records + dataStore.addMetadataRecord(new ByteArrayInputStream(new byte[0]), + SharedStoreRecordType.REFERENCES.getNameFromId(repoId1)); + DataRecord rec1 = dataStore.getMetadataRecord(SharedStoreRecordType.REFERENCES.getNameFromId(repoId1)); + dataStore.addMetadataRecord(new ByteArrayInputStream(new byte[0]), + SharedStoreRecordType.REFERENCES.getNameFromId(repoId2)); + DataRecord rec2 = dataStore.getMetadataRecord(SharedStoreRecordType.REFERENCES.getNameFromId(repoId2)); + + Assert.assertEquals( + SharedStoreRecordType.REPOSITORY.getIdFromName(repo1.getIdentifier().toString()), + repoId1); + Assert.assertEquals( + SharedStoreRecordType.REPOSITORY.getIdFromName(repo2.getIdentifier().toString()), + repoId2); + Assert.assertEquals( + SharedStoreRecordType.REFERENCES.getIdFromName(rec1.getIdentifier().toString()), + repoId1); + Assert.assertEquals( + SharedStoreRecordType.REFERENCES.getIdFromName(rec2.getIdentifier().toString()), + repoId2); + + // All the references from registered repositories are available + Assert.assertTrue( + SharedDataStoreUtils.refsNotAvailableFromRepos( + dataStore.getAllMetadataRecords(SharedStoreRecordType.REPOSITORY.getType()), + dataStore.getAllMetadataRecords(SharedStoreRecordType.REFERENCES.getType())).isEmpty()); + + // Earliest should be the 1st reference record + Assert.assertEquals( + SharedDataStoreUtils.getEarliestRecord( + dataStore.getAllMetadataRecords(SharedStoreRecordType.REFERENCES.getType())).getIdentifier().toString(), + SharedStoreRecordType.REFERENCES.getNameFromId(repoId1)); + + // Delete references and check back if deleted + dataStore.deleteAllMetadataRecords(SharedStoreRecordType.REFERENCES.getType()); + Assert.assertTrue(dataStore.getAllMetadataRecords(SharedStoreRecordType.REFERENCES.getType()).isEmpty()); + + // Repository ids should still be available + Assert.assertEquals(2, + dataStore.getAllMetadataRecords(SharedStoreRecordType.REPOSITORY.getType()).size()); + } +} + Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobGC.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobGC.java (date 1423545306000) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobGC.java (revision ) @@ -61,13 +61,13 @@ @Nonnull @Override - public CompositeData startBlobGC() { + public CompositeData startBlobGC(final boolean markOnly) { if (gcOp.isDone()) { gcOp = newManagementOperation(OP_NAME, new Callable() { @Override public String call() throws Exception { long t0 = nanoTime(); - blobGarbageCollector.collectGarbage(); + blobGarbageCollector.collectGarbage(markOnly); return "Blob gc completed in " + formatTime(nanoTime() - t0); } }); Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java (date 1423545306000) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java (revision ) @@ -19,14 +19,16 @@ import java.io.BufferedWriter; import java.io.Closeable; import java.io.File; +import java.io.FileInputStream; import java.io.FileWriter; import java.io.IOException; +import java.io.InputStream; import java.sql.Timestamp; import java.util.Iterator; import java.util.List; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -41,15 +43,20 @@ import com.google.common.collect.PeekingIterator; import com.google.common.io.Closeables; import com.google.common.io.Files; -import com.google.common.util.concurrent.ListenableFutureTask; -import com.google.common.util.concurrent.MoreExecutors; + import org.apache.commons.io.FileUtils; import org.apache.commons.io.LineIterator; +import org.apache.jackrabbit.core.data.DataRecord; +import org.apache.jackrabbit.core.data.DataStoreException; import org.apache.jackrabbit.oak.commons.IOUtils; +import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils; +import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType; import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + /** * Mark and sweep garbage collector. * @@ -72,9 +79,6 @@ /** The last modified time before current time of blobs to consider for garbage collection. */ private final long maxLastModifiedInterval; - /** Run concurrently when possible. */ - private final boolean runConcurrently; - /** The blob store to be garbage collected. */ private final GarbageCollectableBlobStore blobStore; @@ -89,6 +93,8 @@ /** The batch count. */ private final int batchCount; + private String repoId; + /** Flag to indicate the state of the gc **/ private State state = State.NOT_RUNNING; @@ -96,13 +102,15 @@ * Creates an instance of MarkSweepGarbageCollector * * @param marker BlobReferenceRetriever instanced used to fetch refereed blob entries + * @param blobStore the blob store instance + * @param executor executor * @param root the root absolute path of directory under which temporary * files would be created * @param batchCount batch sized used for saving intermediate state - * @param runBackendConcurrently - run the backend iterate concurrently - * @param maxLastModifiedInterval - lastModifiedTime in millis. Only files with time + * @param maxLastModifiedInterval lastModifiedTime in millis. Only files with time * less than this time would be considered for GC - * @throws IOException Signals that an I/O exception has occurred. + * @param repositoryId - unique repository id for this node + * @throws IOException */ public MarkSweepGarbageCollector( BlobReferenceRetriever marker, @@ -110,41 +118,43 @@ Executor executor, String root, int batchCount, - boolean runBackendConcurrently, - long maxLastModifiedInterval) + long maxLastModifiedInterval, + @Nullable String repositoryId) throws IOException { this.executor = executor; this.blobStore = blobStore; this.marker = marker; this.batchCount = batchCount; - this.runConcurrently = runBackendConcurrently; this.maxLastModifiedInterval = maxLastModifiedInterval; + this.repoId = repositoryId; - fs = new GarbageCollectorFileState(root); + fs = new GarbageCollectorFileState(root); } /** * Instantiates a new blob garbage collector. */ public MarkSweepGarbageCollector( - BlobReferenceRetriever marker, + BlobReferenceRetriever marker, GarbageCollectableBlobStore blobStore, - Executor executor) + Executor executor, + @Nullable String repositoryId) throws IOException { - this(marker, blobStore, executor, TEMP_DIR, DEFAULT_BATCH_COUNT, true, TimeUnit.HOURS.toMillis(24)); + this(marker, blobStore, executor, TEMP_DIR, DEFAULT_BATCH_COUNT, TimeUnit.HOURS + .toMillis(24), repositoryId); } public MarkSweepGarbageCollector( BlobReferenceRetriever marker, GarbageCollectableBlobStore blobStore, Executor executor, - long maxLastModifiedInterval) - throws IOException { - this(marker, blobStore, executor, TEMP_DIR, DEFAULT_BATCH_COUNT, true, maxLastModifiedInterval); + long maxLastModifiedInterval, + @Nullable String repositoryId) throws IOException { + this(marker, blobStore, executor, TEMP_DIR, DEFAULT_BATCH_COUNT, maxLastModifiedInterval, repositoryId); } @Override - public void collectGarbage() throws Exception { - markAndSweep(); + public void collectGarbage(boolean markOnly) throws Exception { + markAndSweep(markOnly); } /** @@ -157,22 +167,29 @@ } /** - * Mark and sweep. Main method for GC. + * Mark and sweep. Main entry method for GC. + * + * @param markOnly whether to mark only + * @throws Exception the exception */ - private void markAndSweep() throws IOException, InterruptedException { + private void markAndSweep(boolean markOnly) throws Exception { boolean threw = true; try { Stopwatch sw = Stopwatch.createStarted(); LOG.info("Starting Blob garbage collection"); mark(); + if (!markOnly) { - int deleteCount = sweep(); - threw = false; + int deleteCount = sweep(); + threw = false; - LOG.info("Blob garbage collection completed in {}. Number of blobs " + - "deleted [{}]", sw.toString(), deleteCount); + LOG.info("Blob garbage collection completed in {}. Number of blobs " + + "deleted [{}]", sw.toString(), deleteCount); + } } finally { + if (LOG.isTraceEnabled()) { - Closeables.close(fs, threw); + Closeables.close(fs, threw); + } state = State.NOT_RUNNING; } } @@ -180,29 +197,16 @@ /** * Mark phase of the GC. */ - private void mark() throws IOException, InterruptedException { + private void mark() throws IOException, DataStoreException { state = State.MARKING; LOG.debug("Starting mark phase of the garbage collector"); - // Find all blobs available in the blob store - ListenableFutureTask blobIdRetriever = ListenableFutureTask.create(new BlobIdRetriever()); - if (runConcurrently) { - executor.execute(blobIdRetriever); - } else { - MoreExecutors.sameThreadExecutor().execute(blobIdRetriever); - } - - // Find all blob references after iterating over the whole repository + // Mark all used references iterateNodeTree(); - try { - blobIdRetriever.get(); - } catch (ExecutionException e) { - LOG.warn("Error occurred while fetching all the blobIds from the BlobStore. GC would " + - "continue with the blobIds retrieved so far", e.getCause()); - } + // Move the marked references file to the data store meta area if applicable + GarbageCollectionType.get(blobStore).addMarked(blobStore, fs, repoId); - difference(); LOG.debug("Ending mark phase of the garbage collector"); } @@ -248,16 +252,55 @@ /** * Sweep phase of gc candidate deletion. + *

+ * Performs the following steps depending upon the type of the blob store refer + * {@link org.apache.jackrabbit.oak.plugins.blob.SharedDataStore.Type}: - * + * - * @throws IOException - * Signals that an I/O exception has occurred. + *

    + *
  • Shared
  • + *
      + *
    • Merge all marked references (from the mark phase run independently) available in the data store meta + * store (from all configured independent repositories). + *
    • Retrieve all blob ids available. + *
    • Diffs the 2 sets above to retrieve list of blob ids not used. + *
    • Deletes only blobs created after + * (earliest time stamp of the marked references - #maxLastModifiedInterval) from the above set. + *
    + * + *
  • Default
  • + *
      + *
    • Mark phase already run. + *
    • Retrieve all blob ids available. + *
    • Diffs the 2 sets above to retrieve list of blob ids not used. + *
    • Deletes only blobs created after + * (time stamp of the marked references - #maxLastModifiedInterval). + *
    + *
+ * + * @return the number of blobs deleted + * @throws Exception the exception */ - private int sweep() throws IOException { + private int sweep() throws Exception { + long earliestRefAvailTime; + // Merge all the blob references available from all the reference files in the data store meta store + // Only go ahead if merge succeeded + try { + earliestRefAvailTime = + GarbageCollectionType.get(blobStore).mergeAllMarkedReferences(blobStore, fs); + } catch (Exception e) { + return 0; + } + + // Find all blob references after iterating over the whole repository + (new BlobIdRetriever()).call(); + + // Calculate the references not used + difference(); int count = 0; state = State.SWEEPING; LOG.debug("Starting sweep phase of the garbage collector"); LOG.debug("Sweeping blobs with modified time > than the configured max deleted time ({}). " + - timestampToString(getLastMaxModifiedTime())); + timestampToString(getLastMaxModifiedTime(earliestRefAvailTime))); ConcurrentLinkedQueue exceptionQueue = new ConcurrentLinkedQueue(); @@ -270,13 +313,13 @@ if (ids.size() > getBatchCount()) { count += ids.size(); - executor.execute(new Sweeper(ids, exceptionQueue)); + executor.execute(new Sweeper(ids, exceptionQueue, earliestRefAvailTime)); ids = Lists.newArrayList(); } } if (!ids.isEmpty()) { count += ids.size(); - executor.execute(new Sweeper(ids, exceptionQueue)); + executor.execute(new Sweeper(ids, exceptionQueue, earliestRefAvailTime)); } count -= exceptionQueue.size(); @@ -291,11 +334,13 @@ IOUtils.closeQuietly(writer); } if(!exceptionQueue.isEmpty()) { - LOG.warn("Unable to delete some blobs entries from the blob store. " + - "This may happen if blob modified time is > than the max deleted time ({}). " + - "Details around such blob entries can be found in [{}]", - timestampToString(getLastMaxModifiedTime()), fs.getGarbage().getAbsolutePath()); + LOG.warn( + "Unable to delete some blobs entries from the blob store. This may happen if blob modified time is > " + + "than the max deleted time ({}). Details around such blob entries can be found in [{}]", + timestampToString(getLastMaxModifiedTime(earliestRefAvailTime)), fs.getGarbage().getAbsolutePath()); } + // Remove all the merged marked references + GarbageCollectionType.get(blobStore).removeAllMarkedReferences(blobStore); LOG.debug("Ending sweep phase of the garbage collector"); return count; } @@ -304,10 +349,10 @@ return batchCount; } - private long getLastMaxModifiedTime(){ + private long getLastMaxModifiedTime(long maxModified) { return maxLastModifiedInterval > 0 ? - System.currentTimeMillis() - maxLastModifiedInterval : 0; - + ((maxModified <= 0 ? System.currentTimeMillis() : maxModified) - maxLastModifiedInterval) : + 0; } /** @@ -331,16 +376,20 @@ /** The ids to sweep. */ private final List ids; - public Sweeper(List ids, ConcurrentLinkedQueue exceptionQueue) { + private final long maxModified; + + public Sweeper(List ids, ConcurrentLinkedQueue exceptionQueue, + long maxModified) { this.exceptionQueue = exceptionQueue; this.ids = ids; + this.maxModified = maxModified; } @Override public void run() { try { LOG.debug("Blob ids to be deleted {}", ids); - boolean deleted = blobStore.deleteChunks(ids, getLastMaxModifiedTime()); + boolean deleted = blobStore.deleteChunks(ids, getLastMaxModifiedTime(maxModified)); if (!deleted) { // Only log and do not add to exception queue since some blobs may not match the // lastMaxModifiedTime criteria. @@ -369,7 +418,7 @@ @Override public void addReference(String blobId) { if (debugMode) { - LOG.trace("BlobId : {}",blobId); + LOG.trace("BlobId : {}", blobId); } try { @@ -384,7 +433,7 @@ } if (debugMode) { - LOG.trace("chunkId : {}",id); + LOG.trace("chunkId : {}", id); } count.getAndIncrement(); } @@ -400,9 +449,9 @@ } ); LOG.info("Number of valid blob references marked under mark phase of " + - "Blob garbage collection [{}]",count.get()); + "Blob garbage collection [{}]", count.get()); // sort the marked references - fs.sort(fs.getMarkedRefs()); + GarbageCollectorFileState.sort(fs.getMarkedRefs()); } finally { IOUtils.closeQuietly(writer); } @@ -439,22 +488,20 @@ } // sort the file - fs.sort(fs.getAvailableRefs()); + GarbageCollectorFileState.sort(fs.getAvailableRefs()); LOG.debug("Number of blobs present in BlobStore : [{}] ", blobsCount); } finally { IOUtils.closeQuietly(bufferWriter); } return blobsCount; } - - } /** * FileLineDifferenceIterator class which iterates over the difference of 2 files line by line. */ - static class FileLineDifferenceIterator extends AbstractIterator implements Closeable{ + static class FileLineDifferenceIterator extends AbstractIterator implements Closeable { private final PeekingIterator peekMarked; private final LineIterator marked; private final LineIterator all; @@ -526,5 +573,108 @@ */ private static String timestampToString(long timestamp){ return (new Timestamp(timestamp) + "00").substring(0, 23); + } + + /** + * Defines different data store types from the garbage collection perspective and encodes the divergent behavior. + *
    + */ + enum GarbageCollectionType { + SHARED { + /** + * Remove the maked references from the blob store root. Default NOOP. + * + * @param blobStore the blobStore instance + */ + @Override + void removeAllMarkedReferences(GarbageCollectableBlobStore blobStore) { + ((SharedDataStore) blobStore).deleteAllMetadataRecords(SharedStoreRecordType.REFERENCES.getType()); + } + + /** + * Merge all marked references available from all repositories and return the earliest time of the references. + * + * @param blobStore the blob store + * @param fs the fs + * @return the long the earliest time of the available references + * @throws IOException Signals that an I/O exception has occurred. + * @throws DataStoreException the data store exception + */ + @Override + long mergeAllMarkedReferences(GarbageCollectableBlobStore blobStore, + GarbageCollectorFileState fs) + throws IOException, DataStoreException { + + List refFiles = + ((SharedDataStore) blobStore).getAllMetadataRecords(SharedStoreRecordType.REFERENCES.getType()); + + // Get all the repositories registered + List repoFiles = + ((SharedDataStore) blobStore).getAllMetadataRecords(SharedStoreRecordType.REPOSITORY.getType()); + + // Retrieve repos for which reference files have not been created + Set unAvailRepos = + SharedDataStoreUtils.refsNotAvailableFromRepos(repoFiles, refFiles); + if (unAvailRepos.isEmpty()) { + // List of files to be merged + List files = Lists.newArrayList(); + for (DataRecord refFile : refFiles) { + File file = GarbageCollectorFileState.copy(refFile.getStream()); + files.add(file); + } + + GarbageCollectorFileState.merge(files, fs.getMarkedRefs()); + + return SharedDataStoreUtils.getEarliestRecord(refFiles).getLastModified(); + } else { + LOG.error("Not all repositories have marked references available : {}", unAvailRepos); + throw new IOException("Not all repositories have marked references available"); + } + } + + /** + * Adds the marked references to the blob store root. Default NOOP + * + * @param blobStore the blob store + * @param fs the fs + * @param repoId the repo id + * @throws DataStoreException the data store exception + * @throws IOException Signals that an I/O exception has occurred. + */ + @Override + void addMarked(GarbageCollectableBlobStore blobStore, GarbageCollectorFileState fs, + String repoId) throws DataStoreException, IOException { + InputStream is = new FileInputStream(fs.getMarkedRefs()); + try { + ((SharedDataStore) blobStore) + .addMetadataRecord(is, SharedStoreRecordType.REFERENCES.getNameFromId(repoId)); + } finally { + Closeables.close(is, false); + } + } + }, + DEFAULT; + + void removeAllMarkedReferences(GarbageCollectableBlobStore blobStore) {} + + void addMarked(GarbageCollectableBlobStore blobStore, GarbageCollectorFileState fs, + String repoId) throws DataStoreException, IOException {} + + long mergeAllMarkedReferences(GarbageCollectableBlobStore blobStore, + GarbageCollectorFileState fs) + throws IOException, DataStoreException { + // throw id the marked refs not available. + if (!fs.getMarkedRefs().exists() || fs.getMarkedRefs().length() == 0) { + throw new IOException("Marked references not available"); + } + return fs.getMarkedRefs().lastModified(); + } + + public static GarbageCollectionType get(GarbageCollectableBlobStore blobStore) { + if (SharedDataStoreUtils.isShared(blobStore)) { + return SHARED; + } + return DEFAULT; + } } } Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStore.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStore.java (date 1423545306000) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStore.java (revision ) @@ -19,6 +19,9 @@ package org.apache.jackrabbit.oak.plugins.blob.datastore; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.collect.Iterators.filter; +import static com.google.common.collect.Iterators.transform; import java.io.ByteArrayInputStream; import java.io.File; import java.io.FileInputStream; @@ -48,21 +51,20 @@ import org.apache.jackrabbit.core.data.DataStoreException; import org.apache.jackrabbit.core.data.MultiDataStoreAware; import org.apache.jackrabbit.oak.cache.CacheLIRS; +import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore; import org.apache.jackrabbit.oak.spi.blob.BlobStore; import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.collect.Iterators.filter; -import static com.google.common.collect.Iterators.transform; /** * BlobStore wrapper for DataStore. Wraps Jackrabbit 2 DataStore and expose them as BlobStores * It also handles inlining binaries if there size is smaller than * {@link org.apache.jackrabbit.core.data.DataStore#getMinRecordLength()} */ -public class DataStoreBlobStore implements DataStore, BlobStore, GarbageCollectableBlobStore { +public class DataStoreBlobStore implements DataStore, SharedDataStore, BlobStore, + GarbageCollectableBlobStore { private final Logger log = LoggerFactory.getLogger(getClass()); private final DataStore delegate; @@ -355,12 +357,8 @@ }), new Predicate() { @Override public boolean apply(@Nullable DataRecord input) { - if (input != null && (maxLastModifiedTime <= 0 - || input.getLastModified() < maxLastModifiedTime)) { - return true; + return input != null && (maxLastModifiedTime <= 0 || input.getLastModified() < maxLastModifiedTime); - } + } - return false; - } }), new Function() { @Override public String apply(DataRecord input) { @@ -392,6 +390,48 @@ @Override public Iterator resolveChunks(String blobId) throws IOException { return Iterators.singletonIterator(blobId); + } + + @Override + public void addMetadataRecord(InputStream stream, String name) throws DataStoreException { + if (delegate instanceof SharedDataStore) { + ((SharedDataStore) delegate).addMetadataRecord(stream, name); + } + } + + @Override public DataRecord getMetadataRecord(String name) { + if (delegate instanceof SharedDataStore) { + return ((SharedDataStore) delegate).getMetadataRecord(name); + } + return null; + } + + @Override + public List getAllMetadataRecords(String prefix) { + if (delegate instanceof SharedDataStore) { + return ((SharedDataStore) delegate).getAllMetadataRecords(prefix); + } + return null; + } + + @Override + public boolean deleteMetadataRecord(String name) { + return delegate instanceof SharedDataStore && ((SharedDataStore) delegate).deleteMetadataRecord(name); + } + + @Override + public void deleteAllMetadataRecords(String prefix) { + if (delegate instanceof SharedDataStore) { + ((SharedDataStore) delegate).deleteAllMetadataRecords(prefix); + } + } + + @Override + public Type getType() { + if (delegate instanceof SharedDataStore) { + return Type.SHARED; + } + return Type.DEFAULT; } //~---------------------------------------------< Object > Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java (date 1423545306000) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java (revision ) @@ -16,6 +16,9 @@ */ package org.apache.jackrabbit.oak.plugins.document; +import static org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType.REPOSITORY; +import static org.junit.Assert.assertTrue; + import java.io.ByteArrayInputStream; import java.io.InputStream; import java.util.HashSet; @@ -33,7 +36,10 @@ import junit.framework.Assert; import org.apache.jackrabbit.oak.api.Blob; import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector; +import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore; +import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils; import org.apache.jackrabbit.oak.plugins.document.VersionGarbageCollector.VersionGCStats; +import org.apache.jackrabbit.oak.plugins.identifier.ClusterRepositoryInfo; import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore; import org.apache.jackrabbit.oak.spi.commit.CommitInfo; import org.apache.jackrabbit.oak.spi.commit.EmptyHook; @@ -56,7 +62,7 @@ NodeBuilder a = s.getRoot().builder(); int number = 10; - int maxDeleted = 5; + int maxDeleted = 5; // track the number of the assets to be deleted List processed = Lists.newArrayList(); Random rand = new Random(47); @@ -147,12 +153,19 @@ } private void gc(HashSet remaining) throws Exception { DocumentNodeStore store = mk.getNodeStore(); + String repoId = null; + if (SharedDataStoreUtils.isShared(store.getBlobStore())) { + repoId = ClusterRepositoryInfo.createId(store); + ((SharedDataStore) store.getBlobStore()).addMetadataRecord( + new ByteArrayInputStream(new byte[0]), + REPOSITORY.getNameFromId(repoId)); + } MarkSweepGarbageCollector gc = new MarkSweepGarbageCollector( new DocumentBlobReferenceRetriever(store), (GarbageCollectableBlobStore) store.getBlobStore(), MoreExecutors.sameThreadExecutor(), - "./target", 5, true, 0); - gc.collectGarbage(); + "./target", 5, 0, repoId); + gc.collectGarbage(false); Set existingAfterGC = iterate(); boolean empty = Sets.symmetricDifference(remaining, existingAfterGC).isEmpty(); Index: oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/RepositoryManagementMBean.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/RepositoryManagementMBean.java (date 1423545306000) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/RepositoryManagementMBean.java (revision ) @@ -128,12 +128,13 @@ CompositeData getRestoreStatus(); /** - * Initiate a data store garbage collection operation + * Initiate a data store garbage collection operation. * + * @param markOnly whether to only mark references and not sweep in the mark and sweep operation. * @return the status of the operation right after it was initiated */ @Nonnull - CompositeData startDataStoreGC(); + CompositeData startDataStoreGC(boolean markOnly); /** * Data store garbage collection status Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/SharedBlobStoreGCTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/SharedBlobStoreGCTest.java (revision ) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/SharedBlobStoreGCTest.java (revision ) @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.plugins.document; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import junit.framework.Assert; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.MoreExecutors; + +import org.apache.jackrabbit.oak.api.Blob; +import org.apache.jackrabbit.oak.plugins.blob.BlobGarbageCollector; +import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector; +import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore; +import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType; +import org.apache.jackrabbit.oak.plugins.document.VersionGarbageCollector.VersionGCStats; +import org.apache.jackrabbit.oak.plugins.document.blob.ds.DataStoreUtils; +import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore; +import org.apache.jackrabbit.oak.plugins.identifier.ClusterRepositoryInfo; +import org.apache.jackrabbit.oak.spi.blob.BlobStore; +import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore; +import org.apache.jackrabbit.oak.spi.commit.CommitInfo; +import org.apache.jackrabbit.oak.spi.commit.EmptyHook; +import org.apache.jackrabbit.oak.spi.state.NodeBuilder; +import org.apache.jackrabbit.oak.stats.Clock; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Test for gc in a shared data store among hetrogeneous oak node stores. + */ +public class SharedBlobStoreGCTest { + private Cluster cluster1; + private Cluster cluster2; + private Clock clock; + + @Before + public void setUp() throws Exception { + clock = new Clock.Virtual(); + clock.waitUntil(Revision.getCurrentTimestamp()); + DataStoreUtils.time = clock.getTime(); + + BlobStore blobeStore1 = DataStoreUtils.getBlobStore(); + DocumentNodeStore ds1 = new DocumentMK.Builder() + .setAsyncDelay(0) + .setDocumentStore(new MemoryDocumentStore()) + .setBlobStore(blobeStore1) + .clock(clock) + .getNodeStore(); + String repoId1 = ClusterRepositoryInfo.createId(ds1); + // Register the unique repository id in the data store + ((SharedDataStore) blobeStore1).addMetadataRecord(new ByteArrayInputStream(new byte[0]), + SharedStoreRecordType.REPOSITORY.getNameFromId(repoId1)); + + BlobStore blobeStore2 = DataStoreUtils.getBlobStore(); + DocumentNodeStore ds2 = new DocumentMK.Builder() + .setAsyncDelay(0) + .setDocumentStore(new MemoryDocumentStore()) + .setBlobStore(blobeStore2) + .clock(clock) + .getNodeStore(); + String repoId2 = ClusterRepositoryInfo.createId(ds2); + // Register the unique repository id in the data store + ((SharedDataStore) blobeStore2).addMetadataRecord(new ByteArrayInputStream(new byte[0]), + SharedStoreRecordType.REPOSITORY.getNameFromId(repoId2)); + + cluster1 = new Cluster(ds1, repoId1, 20); + cluster1.init(); + cluster2 = new Cluster(ds2, repoId2, 100); + cluster2.init(); + } + + static InputStream randomStream(int seed, int size) { + Random r = new Random(seed); + byte[] data = new byte[size]; + r.nextBytes(data); + return new ByteArrayInputStream(data); + } + + @Test + public void testGC() throws Exception { + // Only run the mark phase on both the clusters + cluster1.gc.collectGarbage(true); + cluster2.gc.collectGarbage(true); + + // Execute the gc with sweep + cluster1.gc.collectGarbage(false); + + Assert.assertEquals(true, Sets.symmetricDifference(Sets.union(cluster1.getInitBlobs(), cluster2.getInitBlobs()), + cluster1.getExistingBlobIds()).isEmpty()); + } + + @Test + // GC should fail + public void testOnly1ClusterMark() throws Exception { + // Only run the mark phase on one cluster + cluster1.gc.collectGarbage(true); + + // Execute the gc with sweep + cluster1.gc.collectGarbage(false); + + Assert.assertTrue( + (cluster1.getInitBlobs().size() + cluster2.getInitBlobs().size()) < cluster1.getExistingBlobIds().size()); + Set existing = cluster1.getExistingBlobIds(); + Assert.assertTrue(existing.containsAll(cluster2.getInitBlobs())); + Assert.assertTrue(existing.containsAll(cluster1.getInitBlobs())); + } + + @Test + public void testRepeatedMarkWithSweep() throws Exception { + // Only run the mark phase on one cluster + cluster1.gc.collectGarbage(true); + cluster2.gc.collectGarbage(true); + cluster2.gc.collectGarbage(true); + + // Execute the gc with sweep + cluster2.gc.collectGarbage(false); + + Assert.assertTrue(Sets.symmetricDifference( + Sets.union(cluster1.getInitBlobs(), cluster2.getInitBlobs()), + cluster1.getExistingBlobIds()).isEmpty()); + } + + @After + public void tearDown() { + DataStoreUtils.time = 0; + } + + class Cluster { + private DocumentNodeStore ds; + private int seed; + private BlobGarbageCollector gc; + + private Set initBlobs = new HashSet(); + + protected Set getInitBlobs() { + return initBlobs; + } + + public Cluster(final DocumentNodeStore ds, final String repoId, int seed) + throws IOException { + this.ds = ds; + this.gc = new BlobGarbageCollector() { + @Override + public void collectGarbage(boolean markOnly) throws Exception { + MarkSweepGarbageCollector gc = new MarkSweepGarbageCollector( + new DocumentBlobReferenceRetriever(ds), + (GarbageCollectableBlobStore) ds.getBlobStore(), + MoreExecutors.sameThreadExecutor(), + "./target", 5, 0, repoId); + gc.collectGarbage(markOnly); + } + }; + + this.seed = seed; + } + + /** + * Creates the setup load with deletions. + * + * @throws Exception + */ + public void init() throws Exception { + NodeBuilder a = ds.getRoot().builder(); + + int number = 10; + // track the number of the assets to be deleted + List deletes = Lists.newArrayList(); + Random rand = new Random(47); + for (int i = 0; i < 5; i++) { + int n = rand.nextInt(number); + if (!deletes.contains(n)) { + deletes.add(n); + } + } + for (int i = 0; i < number; i++) { + Blob b = ds.createBlob(randomStream(i + seed, 4160)); + if (!deletes.contains(i)) { + Iterator idIter = + ((GarbageCollectableBlobStore) ds.getBlobStore()) + .resolveChunks(b.toString()); + while (idIter.hasNext()) { + initBlobs.add(idIter.next()); + } + } + a.child("c" + i).setProperty("x", b); + } + ds.merge(a, EmptyHook.INSTANCE, CommitInfo.EMPTY); + + a = ds.getRoot().builder(); + for (int id : deletes) { + a.child("c" + id).remove(); + ds.merge(a, EmptyHook.INSTANCE, CommitInfo.EMPTY); + } + long maxAge = 10; // hours + // 1. Go past GC age and check no GC done as nothing deleted + clock.waitUntil(clock.getTime() + TimeUnit.MINUTES.toMillis(maxAge)); + + VersionGarbageCollector vGC = ds.getVersionGarbageCollector(); + VersionGCStats stats = vGC.gc(0, TimeUnit.MILLISECONDS); + Assert.assertEquals(deletes.size(), stats.deletedDocGCCount); + } + + public Set getExistingBlobIds() throws Exception { + GarbageCollectableBlobStore store = (GarbageCollectableBlobStore) ds.getBlobStore(); + Iterator cur = store.getAllChunkIds(0); + + Set existing = Sets.newHashSet(); + while (cur.hasNext()) { + existing.add(cur.next()); + } + return existing; + } + } +} + Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (date 1423545306000) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (revision ) @@ -1973,11 +1973,13 @@ * Creates and returns a MarkSweepGarbageCollector if the current BlobStore * supports garbage collection * - * @return garbage collector of the BlobStore supports GC otherwise null * @param blobGcMaxAgeInSecs + * @param repositoryId + * @return garbage collector of the BlobStore supports GC otherwise null */ @CheckForNull - public MarkSweepGarbageCollector createBlobGarbageCollector(long blobGcMaxAgeInSecs) { + public MarkSweepGarbageCollector createBlobGarbageCollector(long blobGcMaxAgeInSecs, + String repositoryId) { MarkSweepGarbageCollector blobGC = null; if(blobStore instanceof GarbageCollectableBlobStore){ try { @@ -1985,7 +1987,8 @@ new DocumentBlobReferenceRetriever(this), (GarbageCollectableBlobStore) blobStore, executor, - TimeUnit.SECONDS.toMillis(blobGcMaxAgeInSecs)); + TimeUnit.SECONDS.toMillis(blobGcMaxAgeInSecs), + repositoryId); } catch (IOException e) { throw new RuntimeException("Error occurred while initializing " + "the MarkSweepGarbageCollector",e); Index: oak-core/src/main/java/org/apache/jackrabbit/oak/management/RepositoryManager.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/management/RepositoryManager.java (date 1423545306000) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/management/RepositoryManager.java (revision ) @@ -123,12 +123,12 @@ } @Override - public CompositeData startDataStoreGC() { + public CompositeData startDataStoreGC(final boolean markOnly) { return execute(BlobGCMBean.class, new Function() { @Nonnull @Override public Status apply(BlobGCMBean blobGCService) { - return fromCompositeData(blobGCService.startBlobGC()); + return fromCompositeData(blobGCService.startBlobGC(markOnly)); } }).toCompositeData(); } Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobGCMBean.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobGCMBean.java (date 1423545306000) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobGCMBean.java (revision ) @@ -32,12 +32,12 @@ String TYPE = "BlobGarbageCollection"; /** - * Initiate a data store garbage collection operation + * Initiate a data store garbage collection operation. * + * @param markOnly whether to only mark references and not sweep in the mark and sweep operation. * @return the status of the operation right after it was initiated */ - @Nonnull - CompositeData startBlobGC(); + CompositeData startBlobGC(boolean markOnly); /** * Data store garbage collection status Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobGarbageCollector.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobGarbageCollector.java (date 1423545306000) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobGarbageCollector.java (revision ) @@ -22,9 +22,11 @@ public interface BlobGarbageCollector { /** - * Collect garbage blobs from the passed node store instance. + * Marks garbage blobs from the passed node store instance. + * Collects them only if markOnly is false. - * + * - * @throws Exception + * @param markOnly whether to only mark references and not sweep in the mark and sweep operation. + * @throws Exception the exception */ - void collectGarbage() throws Exception; + void collectGarbage(boolean markOnly) throws Exception; -} \ No newline at end of file +} Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java (date 1423545306000) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java (revision ) @@ -26,6 +26,7 @@ import static org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.TIMESTAMP_DEFAULT; import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.registerMBean; +import java.io.ByteArrayInputStream; import java.io.Closeable; import java.io.File; import java.io.IOException; @@ -50,6 +51,10 @@ import org.apache.jackrabbit.oak.plugins.blob.BlobGCMBean; import org.apache.jackrabbit.oak.plugins.blob.BlobGarbageCollector; import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector; +import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore; +import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils; +import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType; +import org.apache.jackrabbit.oak.plugins.identifier.ClusterRepositoryInfo; import org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategyMBean; import org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy; import org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.CleanupType; @@ -262,15 +267,27 @@ revisionGCRegistration = registerMBean(whiteboard, RevisionGCMBean.class, revisionGC, RevisionGCMBean.TYPE, "Segment node store revision garbage collection"); + // If a shared data store register the repo id in the data store + if (SharedDataStoreUtils.isShared(blobStore)) { + try { + String repoId = ClusterRepositoryInfo.createId(delegate); + ((SharedDataStore) blobStore).addMetadataRecord(new ByteArrayInputStream(new byte[0]), + SharedStoreRecordType.REPOSITORY.getNameFromId(repoId)); + } catch (Exception e) { + throw new IOException("Could not register a unique repositoryId", e); + } + } + if (store.getBlobStore() instanceof GarbageCollectableBlobStore) { BlobGarbageCollector gc = new BlobGarbageCollector() { @Override - public void collectGarbage() throws Exception { + public void collectGarbage(boolean sweep) throws Exception { MarkSweepGarbageCollector gc = new MarkSweepGarbageCollector( new SegmentBlobReferenceRetriever(store.getTracker()), (GarbageCollectableBlobStore) store.getBlobStore(), - executor); - gc.collectGarbage(); + executor, + ClusterRepositoryInfo.getId(delegate)); + gc.collectGarbage(sweep); } }; Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/blob/ds/DataStoreUtils.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/blob/ds/DataStoreUtils.java (date 1423545306000) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/blob/ds/DataStoreUtils.java (revision ) @@ -24,6 +24,7 @@ import org.apache.jackrabbit.core.data.FileDataStore; import org.apache.jackrabbit.oak.commons.PropertiesUtil; import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore; +import org.apache.jackrabbit.oak.plugins.blob.datastore.OakFileDataStore; import org.apache.jackrabbit.oak.plugins.document.AbstractMongoConnectionTest; import org.junit.Test; @@ -46,8 +47,9 @@ private static final String DS_PROP_PREFIX = "ds."; private static final String BS_PROP_PREFIX = "bs."; + public static long time; public static DataStoreBlobStore getBlobStore() throws Exception { - String className = System.getProperty(DS_CLASS_NAME, FileDataStore.class.getName()); + String className = System.getProperty(DS_CLASS_NAME, OakFileDataStore.class.getName()); DataStore ds = Class.forName(className).asSubclass(DataStore.class).newInstance(); PropertiesUtil.populate(ds, getConfig(), false); ds.init(getHomeDir()); @@ -67,7 +69,8 @@ } private static String getHomeDir() { - return concat(new File(".").getAbsolutePath(), "target/blobstore/" + System.currentTimeMillis()); + return concat(new File(".").getAbsolutePath(), "target/blobstore/" + + (time == 0 ? System.currentTimeMillis() : time)); } @Test Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/identifier/ClusterRepositoryInfo.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/identifier/ClusterRepositoryInfo.java (revision ) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/identifier/ClusterRepositoryInfo.java (revision ) @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.identifier; + +import java.util.UUID; + +import org.apache.jackrabbit.oak.api.CommitFailedException; +import org.apache.jackrabbit.oak.api.Type; +import org.apache.jackrabbit.oak.spi.commit.CommitInfo; +import org.apache.jackrabbit.oak.spi.commit.EmptyHook; +import org.apache.jackrabbit.oak.spi.state.NodeBuilder; +import org.apache.jackrabbit.oak.spi.state.NodeStore; + +/** + * Utility class to manage a unique cluster/repository id for the cluster. + */ +public class ClusterRepositoryInfo { + public static final String CLUSTER_CONFIG_NODE = ":clusterConfig"; + public static final String CLUSTER_ID_PROP = ":clusterId"; + + /** + * Adds a new uuid for the repository in the property /:clusterConfig/:clusterId with preoperty. + * + * @param store the NodeStore instance + * @return the repository id + * @throws CommitFailedException + */ + public static String createId(NodeStore store) throws CommitFailedException { + NodeBuilder root = store.getRoot().builder(); + if (!root.hasChildNode(CLUSTER_CONFIG_NODE)) { + String id = UUID.randomUUID().toString(); + root.child(CLUSTER_CONFIG_NODE).setProperty(CLUSTER_ID_PROP, id); + store.merge(root, EmptyHook.INSTANCE, CommitInfo.EMPTY); + return id; + } else { + return root.getChildNode(CLUSTER_CONFIG_NODE).getProperty(CLUSTER_ID_PROP).getValue(Type.STRING); + } + } + + /** + * Retrieves the {# CLUSTER_ID_PROP} + * + * @param store the NodeStore instance + * @return the repository id + */ + public static String getId(NodeStore store) { + return store.getRoot().getChildNode(CLUSTER_CONFIG_NODE).getProperty(CLUSTER_ID_PROP).getValue(Type.STRING); + } +} +