Index: oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/RepositoryManagementMBean.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/RepositoryManagementMBean.java (revision 1615951) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/RepositoryManagementMBean.java (working copy) @@ -194,4 +194,12 @@ @Nonnull CompositeData getPropertyIndexAsyncReindexStatus(); + /** + * Initiate a data store garbage collection operation. + * + * @param markOnly whether to mark only + * @return the status of the operation right after it was initiated + */ + @Nonnull + CompositeData startDataStoreGC(boolean markOnly); } Index: oak-core/src/main/java/org/apache/jackrabbit/oak/management/RepositoryManager.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/management/RepositoryManager.java (revision 1615951) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/management/RepositoryManager.java (working copy) @@ -123,17 +123,22 @@ } @Override - public CompositeData startDataStoreGC() { + public CompositeData startDataStoreGC(final boolean markOnly) { return execute(BlobGCMBean.class, new Function() { @Nonnull @Override public Status apply(BlobGCMBean blobGCService) { - return fromCompositeData(blobGCService.startBlobGC()); + return fromCompositeData(blobGCService.startBlobGC(markOnly)); } }).toCompositeData(); } @Override + public CompositeData startDataStoreGC() { + return startDataStoreGC(false); + } + + @Override public CompositeData getDataStoreGCStatus() { return execute(BlobGCMBean.class, new Function() { @Nonnull Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobGarbageCollector.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobGarbageCollector.java (revision 1615951) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobGarbageCollector.java (working copy) @@ -27,4 +27,13 @@ * @throws Exception */ void collectGarbage() throws Exception; -} \ No newline at end of file + + /** + * Marks garbage blobs from the passed node store instance. + * Collects them only if markOnly is false. + * + * @param markOnly whether to run mark only phase + * @throws Exception the exception + */ + void collectGarbage(boolean markOnly) throws Exception; +} Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobGC.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobGC.java (revision 1615951) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobGC.java (working copy) @@ -61,13 +61,13 @@ @Nonnull @Override - public CompositeData startBlobGC() { + public CompositeData startBlobGC(final boolean markOnly) { if (gcOp.isDone()) { gcOp = newManagementOperation(OP_NAME, new Callable() { @Override public String call() throws Exception { long t0 = nanoTime(); - blobGarbageCollector.collectGarbage(); + blobGarbageCollector.collectGarbage(markOnly); return "Blob gc completed in " + formatTime(nanoTime() - t0); } }); @@ -81,4 +81,10 @@ public CompositeData getBlobGCStatus() { return gcOp.getStatus().toCompositeData(); } + + @Override + @Nonnull + public CompositeData startBlobGC() { + return startBlobGC(false); + } } Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobGCMBean.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobGCMBean.java (revision 1615951) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobGCMBean.java (working copy) @@ -48,4 +48,11 @@ @Nonnull CompositeData getBlobGCStatus(); + /** + * Initiate a data store garbage collection operation. + * + * @param markOnly whether to mark only + * @return the status of the operation right after it was initiated + */ + CompositeData startBlobGC(boolean markOnly); } Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStore.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStore.java (revision 1615951) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStore.java (working copy) @@ -19,6 +19,9 @@ package org.apache.jackrabbit.oak.plugins.blob.datastore; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.collect.Iterators.filter; +import static com.google.common.collect.Iterators.transform; import java.io.ByteArrayInputStream; import java.io.File; import java.io.FileInputStream; @@ -48,14 +51,12 @@ import org.apache.jackrabbit.core.data.DataStoreException; import org.apache.jackrabbit.core.data.MultiDataStoreAware; import org.apache.jackrabbit.oak.cache.CacheLIRS; +import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore; import org.apache.jackrabbit.oak.spi.blob.BlobStore; import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.collect.Iterators.filter; -import static com.google.common.collect.Iterators.transform; /** * BlobStore wrapper for DataStore. Wraps Jackrabbit 2 DataStore and expose them as BlobStores @@ -62,7 +63,8 @@ * It also handles inlining binaries if there size is smaller than * {@link org.apache.jackrabbit.core.data.DataStore#getMinRecordLength()} */ -public class DataStoreBlobStore implements DataStore, BlobStore, GarbageCollectableBlobStore { +public class DataStoreBlobStore implements DataStore, SharedDataStore, BlobStore, + GarbageCollectableBlobStore { private final Logger log = LoggerFactory.getLogger(getClass()); private final DataStore delegate; @@ -541,4 +543,43 @@ return new BlobId(dr); } } + + @Override + public DataRecord addRootRecord(InputStream stream, String name) throws DataStoreException { + if (delegate instanceof SharedDataStore) { + return ((SharedDataStore) delegate).addRootRecord(stream, name); + } + return null; + } + + @Override + public List getAllRootRecords(String prefix) { + if (delegate instanceof SharedDataStore) { + return ((SharedDataStore) delegate).getAllRootRecords(prefix); + } + return null; + } + + @Override + public boolean deleteRootRecord(String name) { + if (delegate instanceof SharedDataStore) { + return ((SharedDataStore) delegate).deleteRootRecord(name); + } + return false; + } + + @Override + public void deleteAllRootRecords(String prefix) { + if (delegate instanceof SharedDataStore) { + ((SharedDataStore) delegate).deleteAllRootRecords(prefix); + } + } + + @Override + public Type getType() { + if (delegate instanceof SharedDataStore) { + return Type.SHARED; + } + return Type.DEFAULT; + } } Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/OakFileDataStore.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/OakFileDataStore.java (revision 1615951) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/OakFileDataStore.java (working copy) @@ -20,10 +20,15 @@ package org.apache.jackrabbit.oak.plugins.blob.datastore; import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; import java.lang.ref.WeakReference; import java.util.AbstractMap; +import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; +import java.util.List; import java.util.Set; import com.google.common.base.Charsets; @@ -30,16 +35,27 @@ import com.google.common.base.Function; import com.google.common.base.Predicate; import com.google.common.io.BaseEncoding; +import com.google.common.io.Closeables; import com.google.common.io.Files; + +import org.apache.commons.io.FilenameUtils; +import org.apache.commons.io.IOUtils; +import org.apache.commons.io.filefilter.FileFilterUtils; import org.apache.jackrabbit.core.data.DataIdentifier; +import org.apache.jackrabbit.core.data.DataRecord; import org.apache.jackrabbit.core.data.DataStoreException; +import org.apache.jackrabbit.core.data.FileDataRecord; import org.apache.jackrabbit.core.data.FileDataStore; +import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Oak specific extension of JR2 FileDataStore which enables * provisioning the signing key via OSGi config */ -public class OakFileDataStore extends FileDataStore { +public class OakFileDataStore extends FileDataStore implements SharedDataStore { + public static final Logger LOG = LoggerFactory.getLogger(OakFileDataStore.class); private byte[] referenceKey; public OakFileDataStore() { @@ -51,11 +67,12 @@ @Override public Iterator getAllIdentifiers() { + final String path = FilenameUtils.normalizeNoEndSeparator(getPath()); return Files.fileTreeTraverser().postOrderTraversal(new File(getPath())) .filter(new Predicate() { @Override public boolean apply(File input) { - return input.isFile(); + return input.isFile() && !input.getParent().equals(path); } }) .transform(new Function() { @@ -117,4 +134,82 @@ return Collections.emptySet(); } } + + @Override + public DataRecord addRootRecord(InputStream input, String name) + throws DataStoreException { + try { + File file = new File(getPath(), name); + FileOutputStream os = new FileOutputStream(file); + try { + IOUtils.copyLarge(input, os); + } finally { + Closeables.close(os, true); + Closeables.close(input, true); + } + + return new FileDataRecord(this, + new DataIdentifier(name), + file); + } catch (IOException e) { + LOG.error("Exception while adding root record with name {}, {}", + new Object[] {name, e}); + throw new DataStoreException("Could not add root record", e); + } + } + + @Override + public List getAllRootRecords(String prefix) { + File root = new File(getPath()); + List rootRecords = new ArrayList(); + for (File file : FileFilterUtils.filterList( + FileFilterUtils.prefixFileFilter(prefix), + root.listFiles())) { + if (!file.isDirectory()) { // skip directories which are actual data store files + rootRecords.add( + new FileDataRecord(this, new DataIdentifier(file.getName()), file)); + } + } + return rootRecords; + } + + @Override + public boolean deleteRootRecord(String name) { + File root = new File(getPath()); + + for (File file : FileFilterUtils.filterList( + FileFilterUtils.nameFileFilter(name), + root.listFiles())) { + if (!file.isDirectory()) { // skip directories which are actual data store files + if (!file.delete()) { + LOG.warn("Failed to delete root record {} ", new Object[] {file + .getAbsolutePath()}); + } else { + return true; + } + } + } + return false; + } + + @Override + public void deleteAllRootRecords(String prefix) { + File root = new File(getPath()); + + for (File file : FileFilterUtils.filterList( + FileFilterUtils.prefixFileFilter(prefix), + root.listFiles())) { + if (!file.isDirectory()) { // skip directories which are actual data store files + if (!file.delete()) { + LOG.warn("Failed to delete root record {} ", new Object[] {file + .getAbsolutePath()}); + } + } + } + } + + @Override + public Type getType() { + return Type.DEFAULT; + } } Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedDataStoreUtils.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedDataStoreUtils.java (revision 0) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedDataStoreUtils.java (working copy) @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.blob.datastore; + +import java.io.InputStream; +import java.util.List; +import java.util.Set; + +import javax.annotation.Nullable; + +import com.google.common.base.Function; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.Ordering; +import com.google.common.collect.Sets; + +import org.apache.jackrabbit.core.data.DataRecord; +import org.apache.jackrabbit.core.data.DataStoreException; +import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore; +import org.apache.jackrabbit.oak.plugins.blob.SharedStoreRecordType; + +/** + * Utility class for {@link SharedDataStore}. + */ +public class SharedDataStoreUtils { + + /** + * Add a root record. + * + * @param dataStore the data store where record is to be added + * @param type the type of record + * @param stream the record stream + * @param id the id of the record + * @return the data record + * @throws DataStoreException the data store exception + */ + public static DataRecord addRootRecord(SharedDataStore dataStore, SharedStoreRecordType type, + InputStream stream, String id) + throws DataStoreException { + return dataStore.addRootRecord(stream, type.getNameFromId(id)); + } + + /** + * Gets the root records with the given prefix. + * + * @param dataStore the data store + * @param record the record + * @return the root records + * @throws DataStoreException the data store exception + */ + public static List getRootRecords(SharedDataStore dataStore, + SharedStoreRecordType record) + throws DataStoreException { + return dataStore.getAllRootRecords(record.getType()); + } + + /** + * Delete root records. + * + * @param dataStore the data store + * @param type the type + */ + public static void deleteRootRecords(SharedDataStore dataStore, SharedStoreRecordType type) { + dataStore.deleteAllRootRecords(type.getType()); + } + + /** + * Gets the earliest record of the available reference records. + * + * @param recs the recs + * @return the earliest record + */ + public static DataRecord getEarliestRecord(List recs) { + return Ordering.natural().onResultOf( + new Function() { + @Override + @Nullable + public Long apply(@Nullable DataRecord input) { + return input.getLastModified(); + } + }).min(recs); + } + + /** + * Repositories from which marked references not available. + * + * @param repos the repos + * @param refs the refs + * @return the sets the sets whose references not available + */ + public static Set refsNotAvailableFromRepos(List repos, + List refs) { + return Sets.difference(FluentIterable.from(repos).uniqueIndex( + new Function() { + @Override + @Nullable + public String apply(@Nullable DataRecord input) { + return SharedStoreRecordType.REPOSITORY.getIdFromName(input + .getIdentifier() + .toString()); + } + }).keySet(), + FluentIterable.from(refs).uniqueIndex( + new Function() { + @Override + @Nullable + public String apply(@Nullable DataRecord input) { + return SharedStoreRecordType.REFERENCES.getIdFromName(input + .getIdentifier() + .toString()); + } + }).keySet()); + } +} Property changes on: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedDataStoreUtils.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java (revision 1615951) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java (working copy) @@ -18,12 +18,16 @@ import java.io.Closeable; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; +import java.io.InputStream; import java.util.Comparator; +import java.util.List; import com.google.common.io.Files; import org.apache.commons.io.FileUtils; +import org.apache.jackrabbit.oak.commons.IOUtils; import org.apache.jackrabbit.oak.commons.sort.ExternalSort; /** @@ -47,6 +51,13 @@ /** The garbage stores the garbage collection candidates which were not deleted . */ private final File garbage; + private final static Comparator lexComparator = + new Comparator() { + @Override + public int compare(String s1, String s2) { + return s1.compareTo(s2); + } + }; /** * Instantiates a new garbage collector file state. @@ -118,21 +129,26 @@ * * @param file file whose contents needs to be sorted */ - public void sort(File file) throws IOException { + public static void sort(File file) throws IOException { File sorted = createTempFile(); - Comparator lexComparator = new Comparator() { - @Override - public int compare(String s1, String s2) { - return s1.compareTo(s2); - } - }; - ExternalSort.mergeSortedFiles( - ExternalSort.sortInBatch(file, lexComparator, true), - sorted, lexComparator, true); + merge(ExternalSort.sortInBatch(file, lexComparator, true), sorted); Files.move(sorted, file); } + + public static void merge(List files, File output) throws IOException { + ExternalSort.mergeSortedFiles( + files, + output, lexComparator, true); + } + + public static File copy(InputStream stream) throws IOException { + File file = createTempFile(); + IOUtils.copy(stream, + new FileOutputStream(file)); + return file; + } - private File createTempFile() throws IOException { - return File.createTempFile("temp", null, home); + private static File createTempFile() throws IOException { + return File.createTempFile("temp", null); } } \ No newline at end of file Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java (revision 1615951) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java (working copy) @@ -19,14 +19,16 @@ import java.io.BufferedWriter; import java.io.Closeable; import java.io.File; +import java.io.FileInputStream; import java.io.FileWriter; import java.io.IOException; +import java.io.InputStream; import java.sql.Timestamp; import java.util.Iterator; import java.util.List; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -41,11 +43,14 @@ import com.google.common.collect.PeekingIterator; import com.google.common.io.Closeables; import com.google.common.io.Files; -import com.google.common.util.concurrent.ListenableFutureTask; -import com.google.common.util.concurrent.MoreExecutors; + import org.apache.commons.io.FileUtils; import org.apache.commons.io.LineIterator; +import org.apache.jackrabbit.core.data.DataRecord; +import org.apache.jackrabbit.core.data.DataStoreException; import org.apache.jackrabbit.oak.commons.IOUtils; +import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore.Type; +import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils; import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,14 +72,13 @@ public static final int DEFAULT_BATCH_COUNT = 2048; - public static enum State {NOT_RUNNING, MARKING, SWEEPING} + public static enum State { + NOT_RUNNING, MARKING, SWEEPING + } /** The last modified time before current time of blobs to consider for garbage collection. */ private final long maxLastModifiedInterval; - /** Run concurrently when possible. */ - private final boolean runConcurrently; - /** The blob store to be garbage collected. */ private final GarbageCollectableBlobStore blobStore; @@ -89,6 +93,8 @@ /** The batch count. */ private final int batchCount; + private String repoId; + /** Flag to indicate the state of the gc **/ private State state = State.NOT_RUNNING; @@ -110,16 +116,16 @@ Executor executor, String root, int batchCount, - boolean runBackendConcurrently, - long maxLastModifiedInterval) + long maxLastModifiedInterval, + String repositoryId) throws IOException { this.executor = executor; this.blobStore = blobStore; this.marker = marker; this.batchCount = batchCount; - this.runConcurrently = runBackendConcurrently; this.maxLastModifiedInterval = maxLastModifiedInterval; - fs = new GarbageCollectorFileState(root); + this.repoId = repositoryId; + fs = new GarbageCollectorFileState(root); } /** @@ -126,11 +132,13 @@ * Instantiates a new blob garbage collector. */ public MarkSweepGarbageCollector( - BlobReferenceRetriever marker, + BlobReferenceRetriever marker, GarbageCollectableBlobStore blobStore, - Executor executor) + Executor executor, + String repositoryId) throws IOException { - this(marker, blobStore, executor, TEMP_DIR, DEFAULT_BATCH_COUNT, true, TimeUnit.HOURS.toMillis(24)); + this(marker, blobStore, executor, TEMP_DIR, DEFAULT_BATCH_COUNT, TimeUnit.HOURS + .toMillis(24), repositoryId); } public MarkSweepGarbageCollector( @@ -137,16 +145,23 @@ BlobReferenceRetriever marker, GarbageCollectableBlobStore blobStore, Executor executor, - long maxLastModifiedInterval) + long maxLastModifiedInterval, + String repositoryId) throws IOException { - this(marker, blobStore, executor, TEMP_DIR, DEFAULT_BATCH_COUNT, true, maxLastModifiedInterval); + this(marker, blobStore, executor, TEMP_DIR, DEFAULT_BATCH_COUNT, + maxLastModifiedInterval, repositoryId); } @Override public void collectGarbage() throws Exception { - markAndSweep(); + markAndSweep(false); } + @Override + public void collectGarbage(boolean markOnly) throws Exception { + markAndSweep(markOnly); + } + /** * Gets the state of the gc process. * @@ -158,8 +173,11 @@ /** * Mark and sweep. Main method for GC. + * + * @param markOnly whether to mark only + * @throws Exception the exception */ - private void markAndSweep() throws IOException, InterruptedException { + private void markAndSweep(boolean markOnly) throws Exception { boolean threw = true; try { Stopwatch sw = Stopwatch.createStarted(); @@ -166,13 +184,17 @@ LOG.info("Starting Blob garbage collection"); mark(); - int deleteCount = sweep(); - threw = false; + if (!markOnly) { + int deleteCount = sweep(); + threw = false; - LOG.info("Blob garbage collection completed in {}. Number of blobs " + - "deleted [{}]", sw.toString(), deleteCount); + LOG.info("Blob garbage collection completed in {}. Number of blobs " + + "deleted [{}]", sw.toString(), deleteCount); + } } finally { - Closeables.close(fs, threw); + if (LOG.isTraceEnabled()) { + Closeables.close(fs, threw); + } state = State.NOT_RUNNING; } } @@ -180,29 +202,15 @@ /** * Mark phase of the GC. */ - private void mark() throws IOException, InterruptedException { + private void mark() throws IOException, DataStoreException { state = State.MARKING; LOG.debug("Starting mark phase of the garbage collector"); - // Find all blobs available in the blob store - ListenableFutureTask blobIdRetriever = ListenableFutureTask.create(new BlobIdRetriever()); - if (runConcurrently) { - executor.execute(blobIdRetriever); - } else { - MoreExecutors.sameThreadExecutor().execute(blobIdRetriever); - } - - // Find all blob references after iterating over the whole repository + // Mark all used references iterateNodeTree(); - try { - blobIdRetriever.get(); - } catch (ExecutionException e) { - LOG.warn("Error occurred while fetching all the blobIds from the BlobStore. GC would " + - "continue with the blobIds retrieved so far", e.getCause()); - } - - difference(); + // Move the marked references file if applicable + GarbageCollectionType.get(blobStore).addMarked(blobStore, fs, repoId); LOG.debug("Ending mark phase of the garbage collector"); } @@ -248,11 +256,33 @@ /** * Sweep phase of gc candidate deletion. - * - * @throws IOException - * Signals that an I/O exception has occurred. + *

+ * Performs the following steps: + *

    + *
  • Merge all marked references available. + *
  • Retrieve all blob ids available. + *
  • Diffs the 2 sets above to retrieve list of blob ids not used. + *
  • Deletes the above set. + *
+ * + * @return the number of blobs deleted + * @throws Exception the exception */ - private int sweep() throws IOException { + private int sweep() throws Exception { + long earliestRefAvailTime = System.currentTimeMillis(); + // Only go ahead if merge succeeded + try { + earliestRefAvailTime = + GarbageCollectionType.get(blobStore).mergeAllMarkedReferences(blobStore, fs); + } catch (Exception e) { + return 0; + } + + // Find all blob references after iterating over the whole repository + (new BlobIdRetriever()).call(); + + // Calculate the references not used + difference(); int count = 0; state = State.SWEEPING; LOG.debug("Starting sweep phase of the garbage collector"); @@ -268,13 +298,13 @@ if (ids.size() > getBatchCount()) { count += ids.size(); - executor.execute(new Sweeper(ids, exceptionQueue)); + executor.execute(new Sweeper(ids, exceptionQueue, earliestRefAvailTime)); ids = Lists.newArrayList(); } } if (!ids.isEmpty()) { count += ids.size(); - executor.execute(new Sweeper(ids, exceptionQueue)); + executor.execute(new Sweeper(ids, exceptionQueue, earliestRefAvailTime)); } count -= exceptionQueue.size(); @@ -292,6 +322,8 @@ LOG.warn("Unable to delete some blob entries from the blob store. Details around such blob entries " + "can be found in [{}]", fs.getGarbage().getAbsolutePath()); } + // Remove all the merged marked references + GarbageCollectionType.get(blobStore).removeAllMarkedReferences(blobStore); LOG.debug("Ending sweep phase of the garbage collector"); return count; } @@ -300,10 +332,10 @@ return batchCount; } - private long getLastMaxModifiedTime(){ + private long getLastMaxModifiedTime(long maxModified) { return maxLastModifiedInterval > 0 ? - System.currentTimeMillis() - maxLastModifiedInterval : 0; - + (maxModified <= 0 ? System.currentTimeMillis() : maxModified) + - maxLastModifiedInterval : 0; } /** @@ -327,9 +359,13 @@ /** The ids to sweep. */ private final List ids; - public Sweeper(List ids, ConcurrentLinkedQueue exceptionQueue) { + private final long maxModified; + + public Sweeper(List ids, ConcurrentLinkedQueue exceptionQueue, + long maxModified) { this.exceptionQueue = exceptionQueue; this.ids = ids; + this.maxModified = maxModified; } @Override @@ -336,7 +372,7 @@ public void run() { try { LOG.debug("Blob ids to be deleted {}", ids); - boolean deleted = blobStore.deleteChunks(ids,getLastMaxModifiedTime()); + boolean deleted = blobStore.deleteChunks(ids, getLastMaxModifiedTime(maxModified)); if (!deleted) { exceptionQueue.addAll(ids); } @@ -363,7 +399,7 @@ @Override public void addReference(String blobId) { if (debugMode) { - LOG.trace("BlobId : {}",blobId); + LOG.trace("BlobId : {}", blobId); } try { @@ -378,7 +414,7 @@ } if (debugMode) { - LOG.trace("chunkId : {}",id); + LOG.trace("chunkId : {}", id); } count.getAndIncrement(); } @@ -392,11 +428,11 @@ } } } - ); + ); LOG.info("Number of valid blob references marked under mark phase of " + - "Blob garbage collection [{}]",count.get()); + "Blob garbage collection [{}]", count.get()); // sort the marked references - fs.sort(fs.getMarkedRefs()); + GarbageCollectorFileState.sort(fs.getMarkedRefs()); } finally { IOUtils.closeQuietly(writer); } @@ -408,7 +444,7 @@ */ private class BlobIdRetriever implements Callable { @Override - public Integer call() throws Exception { + public Integer call() throws Exception { LOG.debug("Starting retrieve of all blobs"); BufferedWriter bufferWriter = null; int blobsCount = 0; @@ -415,7 +451,7 @@ try { bufferWriter = new BufferedWriter( new FileWriter(fs.getAvailableRefs())); - Iterator idsIter = blobStore.getAllChunkIds(getLastMaxModifiedTime()); + Iterator idsIter = blobStore.getAllChunkIds(0); List ids = Lists.newArrayList(); while (idsIter.hasNext()) { @@ -433,16 +469,14 @@ } // sort the file - fs.sort(fs.getAvailableRefs()); + GarbageCollectorFileState.sort(fs.getAvailableRefs()); LOG.debug("Number of blobs present in BlobStore : [{}] which have " + - "been last modified before [{}]", blobsCount, timestampToString(getLastMaxModifiedTime())); + "been last modified before [{}]", blobsCount, timestampToString(getLastMaxModifiedTime(0))); } finally { IOUtils.closeQuietly(bufferWriter); } return blobsCount; } - - } @@ -449,7 +483,7 @@ /** * FileLineDifferenceIterator class which iterates over the difference of 2 files line by line. */ - static class FileLineDifferenceIterator extends AbstractIterator implements Closeable{ + static class FileLineDifferenceIterator extends AbstractIterator implements Closeable { private final PeekingIterator peekMarked; private final LineIterator marked; private final LineIterator all; @@ -522,4 +556,110 @@ private static String timestampToString(long timestamp){ return (new Timestamp(timestamp) + "00").substring(0, 23); } + + /** + * Enum that defines different data store types and the encodes the divergent behavior. + */ + enum GarbageCollectionType { + SHARED { + /** + * Remove the maked references from the blob store root. Default noop. + * + * @param blobStore + */ + @Override + void removeAllMarkedReferences(GarbageCollectableBlobStore blobStore) { + SharedDataStoreUtils.deleteRootRecords((SharedDataStore) blobStore, + SharedStoreRecordType.REFERENCES); + } + + /** + * Merge all marked references available and return the earliest time of the references. + * + * @param blobStore the blob store + * @param fs the fs + * @return the long the earliest time of the available references + * @throws IOException Signals that an I/O exception has occurred. + * @throws DataStoreException the data store exception + */ + @Override + long mergeAllMarkedReferences(GarbageCollectableBlobStore blobStore, + GarbageCollectorFileState fs) + throws IOException, DataStoreException { + + List refFiles = + SharedDataStoreUtils + .getRootRecords((SharedDataStore) blobStore, + SharedStoreRecordType.REFERENCES); + // Get all the repositories registered + List repoFiles = + SharedDataStoreUtils + .getRootRecords((SharedDataStore) blobStore, + SharedStoreRecordType.REPOSITORY); + + // Retrieve repos for which reference files have not been created + Set unAvailRepos = + SharedDataStoreUtils.refsNotAvailableFromRepos(repoFiles, refFiles); + if (unAvailRepos.isEmpty()) { + // List of files to be merged + List files = Lists.newArrayList(); + for (DataRecord refFile : refFiles) { + File file = GarbageCollectorFileState.copy(refFile.getStream()); + GarbageCollectorFileState.sort(file); + files.add(file); + } + + GarbageCollectorFileState.merge(files, fs.getMarkedRefs()); + + return SharedDataStoreUtils.getEarliestRecord(refFiles).getLastModified(); + } else { + LOG.error("Not all repositories have marked references available : {}", + unAvailRepos); + throw new IOException( + "Not all repositories have marked references available"); + } + } + + /** + * Adds the marked references to the blob store root. Default noop + * + * @param blobStore the blob store + * @param fs the fs + * @param repoId the repo id + * @throws DataStoreException the data store exception + * @throws IOException Signals that an I/O exception has occurred. + */ + @Override + void addMarked(GarbageCollectableBlobStore blobStore, GarbageCollectorFileState fs, + String repoId) throws DataStoreException, IOException { + InputStream is = new FileInputStream(fs.getMarkedRefs()); + try { + SharedDataStoreUtils.addRootRecord((SharedDataStore) blobStore, + SharedStoreRecordType.REFERENCES, is, repoId); + } finally { + Closeables.close(is, false); + } + } + }, + DEFAULT; + + void removeAllMarkedReferences(GarbageCollectableBlobStore blobStore) {} + + void addMarked(GarbageCollectableBlobStore blobStore, GarbageCollectorFileState fs, + String repoId) throws DataStoreException, IOException {} + + long mergeAllMarkedReferences(GarbageCollectableBlobStore blobStore, + GarbageCollectorFileState fs) + throws IOException, DataStoreException { + return System.currentTimeMillis(); + } + + public static GarbageCollectionType get(GarbageCollectableBlobStore blobStore) { + if (blobStore instanceof SharedDataStore + && ((SharedDataStore) blobStore).getType() == Type.SHARED) { + return SHARED; + } + return DEFAULT; + } + } } Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStore.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStore.java (revision 0) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStore.java (working copy) @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.blob; + +import java.io.InputStream; +import java.util.List; + +import org.apache.jackrabbit.core.data.DataRecord; +import org.apache.jackrabbit.core.data.DataStoreException; + +/** + * Interface to be implemented by a shared data store. + */ +public interface SharedDataStore { + /** + * Explicitly identifies the type of the data store + */ + enum Type { + SHARED, DEFAULT; + } + + /** + * Adds the root record. + * + * @param stream the stream + * @param name the name of the root record + * @return the data record + * @throws DataStoreException the data store exception + */ + DataRecord addRootRecord(InputStream stream, String name) + throws DataStoreException; + + /** + * Gets the all root records. + * + * @return the all root records + */ + List getAllRootRecords(String prefix); + + /** + * Deletes the root record represented by the given parameters. + * + * @param name the name of the root record + * @return success/failure + */ + boolean deleteRootRecord(String name); + + /** + * Deletes all records matching the given prefix. + * + * @param prefix + */ + void deleteAllRootRecords(String prefix); + + /** + * Gets the type. + * + * @return the type + */ + Type getType(); +} Property changes on: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStore.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/SharedStoreRecordType.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/SharedStoreRecordType.java (revision 0) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/SharedStoreRecordType.java (working copy) @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.blob; + +import com.google.common.base.Joiner; +import com.google.common.base.Splitter; + +/** + * Encapsulates the different type of records at the data store root. + */ +public enum SharedStoreRecordType { + REFERENCES("references") { + @Override + public String getNameFromId(String id) { + return Joiner.on(DELIIM).join(getType(), id); + } + }, + REPOSITORY("repository") { + @Override + public String getNameFromId(String id) { + return Joiner.on(DELIIM).join(getType(), id); + } + }; + + private final String type; + + SharedStoreRecordType(String type) { + this.type = type; + } + + public String getType() + { + return type; + } + + public String getIdFromName(String name) { + return Splitter.on(DELIIM).limit(2).splitToList(name).get(1); + } + + public abstract String getNameFromId(String id); + + static final String DELIIM = "-"; +} Property changes on: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/SharedStoreRecordType.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (revision 1615951) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (working copy) @@ -1700,7 +1700,8 @@ * @param blobGcMaxAgeInSecs */ @CheckForNull - public MarkSweepGarbageCollector createBlobGarbageCollector(long blobGcMaxAgeInSecs) { + public MarkSweepGarbageCollector createBlobGarbageCollector(long blobGcMaxAgeInSecs, + String repositoryId) { MarkSweepGarbageCollector blobGC = null; if(blobStore instanceof GarbageCollectableBlobStore){ try { @@ -1708,7 +1709,8 @@ new DocumentBlobReferenceRetriever(this), (GarbageCollectableBlobStore) blobStore, executor, - TimeUnit.SECONDS.toMillis(blobGcMaxAgeInSecs)); + TimeUnit.SECONDS.toMillis(blobGcMaxAgeInSecs), + repositoryId); } catch (IOException e) { throw new RuntimeException("Error occurred while initializing " + "the MarkSweepGarbageCollector",e); Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java (revision 1615951) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java (working copy) @@ -18,6 +18,7 @@ */ package org.apache.jackrabbit.oak.plugins.document; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.util.ArrayList; import java.util.Dictionary; @@ -41,6 +42,8 @@ import org.apache.felix.scr.annotations.Reference; import org.apache.felix.scr.annotations.ReferenceCardinality; import org.apache.felix.scr.annotations.ReferencePolicy; +import org.apache.jackrabbit.core.data.DataStoreException; +import org.apache.jackrabbit.oak.api.CommitFailedException; import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean; import org.apache.jackrabbit.oak.commons.PropertiesUtil; import org.apache.jackrabbit.oak.kernel.KernelNodeStore; @@ -49,9 +52,14 @@ import org.apache.jackrabbit.oak.plugins.blob.BlobGC; import org.apache.jackrabbit.oak.plugins.blob.BlobGCMBean; import org.apache.jackrabbit.oak.plugins.blob.BlobGarbageCollector; +import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore; +import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore.Type; +import org.apache.jackrabbit.oak.plugins.blob.SharedStoreRecordType; +import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils; import org.apache.jackrabbit.oak.plugins.document.cache.CachingDocumentStore; import org.apache.jackrabbit.oak.plugins.document.rdb.RDBDataSourceFactory; import org.apache.jackrabbit.oak.plugins.document.util.MongoConnection; +import org.apache.jackrabbit.oak.plugins.identifier.ClusterRepositoryInfo; import org.apache.jackrabbit.oak.spi.blob.BlobStore; import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore; import org.apache.jackrabbit.oak.spi.state.NodeStore; @@ -319,7 +327,27 @@ mkBuilder.setExecutor(executor); mk = mkBuilder.open(); + try { + String repoId = ClusterRepositoryInfo.createId(mk.getNodeStore()); + if (blobStore != null + && blobStore instanceof SharedDataStore + && ((SharedDataStore) blobStore).getType() == Type.SHARED) { + try { + SharedDataStoreUtils + .addRootRecord( + ((SharedDataStore) blobStore), + SharedStoreRecordType.REPOSITORY, + new ByteArrayInputStream(new byte[0]), + repoId); + } catch (DataStoreException e) { + throw new IOException(e); + } + } + } catch (CommitFailedException e1) { + throw new IOException("Could not register a unique repositoryId"); + } + registerJMXBeans(mk.getNodeStore()); registerLastRevRecoveryJob(mk.getNodeStore()); @@ -463,10 +491,18 @@ BlobGarbageCollector gc = new BlobGarbageCollector() { @Override public void collectGarbage() throws Exception { - store.createBlobGarbageCollector(blobGcMaxAgeInSecs).collectGarbage(); + collectGarbage(false); } + + @Override + public void collectGarbage(boolean sweep) throws Exception { + store.createBlobGarbageCollector(blobGcMaxAgeInSecs, + ClusterRepositoryInfo.getId(mk.getNodeStore().getRoot())) + .collectGarbage(sweep); + } }; - registrations.add(registerMBean(whiteboard, BlobGCMBean.class, new BlobGC(gc, executor), + registrations.add(registerMBean(whiteboard, BlobGCMBean.class, + new BlobGC(gc, executor), BlobGCMBean.TYPE, "Document node store blob garbage collection")); } Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/identifier/ClusterRepositoryInfo.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/identifier/ClusterRepositoryInfo.java (revision 0) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/identifier/ClusterRepositoryInfo.java (working copy) @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.identifier; + +import java.util.UUID; + +import org.apache.jackrabbit.oak.api.CommitFailedException; +import org.apache.jackrabbit.oak.api.Type; +import org.apache.jackrabbit.oak.spi.commit.CommitInfo; +import org.apache.jackrabbit.oak.spi.commit.EmptyHook; +import org.apache.jackrabbit.oak.spi.state.NodeBuilder; +import org.apache.jackrabbit.oak.spi.state.NodeState; +import org.apache.jackrabbit.oak.spi.state.NodeStore; + +/** + * Utility class to manage a unique cluster/repository id for the cluster. + */ +public class ClusterRepositoryInfo { + public static final String CLUSTER_CONFIG = ":clusterConfig"; + public static final String CLUSTER_ID = ":clusterId"; + + public static String createId(NodeStore store) throws CommitFailedException { + NodeBuilder root = store.getRoot().builder(); + if (!root.hasChildNode(CLUSTER_CONFIG)) { + String id = UUID.randomUUID().toString(); + root.child(CLUSTER_CONFIG).setProperty(CLUSTER_ID, id); + store.merge(root, EmptyHook.INSTANCE, CommitInfo.EMPTY); + return id; + } else { + return root.getChildNode(CLUSTER_CONFIG).getProperty(CLUSTER_ID).getValue(Type.STRING); + } + } + + public static String getId(NodeState root) { + return root.getChildNode(CLUSTER_CONFIG).getProperty(CLUSTER_ID).getValue(Type.STRING); + } +} Property changes on: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/identifier/ClusterRepositoryInfo.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java (revision 1615951) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java (working copy) @@ -19,6 +19,7 @@ import static com.google.common.base.Preconditions.checkState; import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.registerMBean; +import java.io.ByteArrayInputStream; import java.io.Closeable; import java.io.File; import java.io.IOException; @@ -34,6 +35,8 @@ import org.apache.felix.scr.annotations.Reference; import org.apache.felix.scr.annotations.ReferenceCardinality; import org.apache.felix.scr.annotations.ReferencePolicy; +import org.apache.jackrabbit.core.data.DataStoreException; +import org.apache.jackrabbit.oak.api.CommitFailedException; import org.apache.jackrabbit.oak.osgi.ObserverTracker; import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard; import org.apache.jackrabbit.oak.plugins.blob.BlobGC; @@ -40,6 +43,11 @@ import org.apache.jackrabbit.oak.plugins.blob.BlobGCMBean; import org.apache.jackrabbit.oak.plugins.blob.BlobGarbageCollector; import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector; +import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore; +import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore.Type; +import org.apache.jackrabbit.oak.plugins.blob.SharedStoreRecordType; +import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils; +import org.apache.jackrabbit.oak.plugins.identifier.ClusterRepositoryInfo; import org.apache.jackrabbit.oak.plugins.segment.file.FileStore; import org.apache.jackrabbit.oak.spi.blob.BlobStore; import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore; @@ -187,15 +195,42 @@ revisionGCRegistration = registerMBean(whiteboard, RevisionGCMBean.class, revisionGC, RevisionGCMBean.TYPE, "Segment node store revision garbage collection"); + try { + String repoId = ClusterRepositoryInfo.createId(delegate); + // If a shared data store register the repo id at the data store + if (blobStore != null + && blobStore instanceof SharedDataStore + && ((SharedDataStore) blobStore).getType() == Type.SHARED) { + try { + + SharedDataStoreUtils + .addRootRecord( + ((SharedDataStore) blobStore), + SharedStoreRecordType.REPOSITORY, + new ByteArrayInputStream(new byte[0]), + repoId); + } catch (DataStoreException e) { + throw new IOException("Could not register a unique repositoryId"); + } + } + } catch (CommitFailedException e1) { + throw new IOException(""); + } if (store.getBlobStore() instanceof GarbageCollectableBlobStore) { BlobGarbageCollector gc = new BlobGarbageCollector() { @Override public void collectGarbage() throws Exception { + collectGarbage(false); + } + + @Override + public void collectGarbage(boolean sweep) throws Exception { MarkSweepGarbageCollector gc = new MarkSweepGarbageCollector( new SegmentBlobReferenceRetriever(store.getTracker()), (GarbageCollectableBlobStore) store.getBlobStore(), - executor); - gc.collectGarbage(); + executor, + ClusterRepositoryInfo.getId(delegate.getRoot())); + gc.collectGarbage(sweep); } }; Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/ClusterRepositoryInfoTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/ClusterRepositoryInfoTest.java (revision 0) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/ClusterRepositoryInfoTest.java (working copy) @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.blob; + +import static org.hamcrest.CoreMatchers.instanceOf; +import junit.framework.Assert; + +import org.apache.jackrabbit.oak.plugins.document.DocumentMK; +import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore; +import org.apache.jackrabbit.oak.plugins.document.blob.ds.DataStoreUtils; +import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore; +import org.apache.jackrabbit.oak.plugins.identifier.ClusterRepositoryInfo; +import org.apache.jackrabbit.oak.spi.blob.BlobStore; +import org.junit.Assume; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Tests the ClusterRepositoryInfo unique cluster repository id. + */ +public class ClusterRepositoryInfoTest { + static BlobStore blobStore; + + @BeforeClass + public static void setup() { + try { + blobStore = DataStoreUtils.getBlobStore(); + Assume.assumeThat(blobStore, instanceOf(SharedDataStore.class)); + } catch (Exception e) { + Assume.assumeNoException(e); + } + } + + @Test + public void differentCluster() throws Exception { + DocumentNodeStore ds1 = new DocumentMK.Builder() + .setAsyncDelay(0) + .setDocumentStore(new MemoryDocumentStore()) + .setBlobStore(blobStore) + .getNodeStore(); + + String repoId1 = ClusterRepositoryInfo.createId(ds1); + DocumentNodeStore ds2 = new DocumentMK.Builder() + .setAsyncDelay(0) + .setDocumentStore(new MemoryDocumentStore()) + .setBlobStore(blobStore) + .getNodeStore(); + String repoId2 = ClusterRepositoryInfo.createId(ds2); + + Assert.assertNotSame(repoId1, repoId2); + } + + @Test + public void sameCluster() throws Exception { + MemoryDocumentStore store = new MemoryDocumentStore(); + DocumentNodeStore ds1 = new DocumentMK.Builder() + .setAsyncDelay(0) + .setDocumentStore(store) + .setClusterId(1) + .setBlobStore(blobStore) + .getNodeStore(); + String repoId1 = ClusterRepositoryInfo.createId(ds1); + ds1.runBackgroundOperations(); + + DocumentNodeStore ds2 = new DocumentMK.Builder() + .setAsyncDelay(0) + .setDocumentStore(store) + .setClusterId(2) + .setBlobStore(blobStore) + .getNodeStore(); + String repoId2 = ClusterRepositoryInfo.createId(ds2); + + // Since the same cluster the ids should be equal + Assert.assertEquals(repoId1, repoId2); + } +} Property changes on: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/ClusterRepositoryInfoTest.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStoreUtilsTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStoreUtilsTest.java (revision 0) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStoreUtilsTest.java (working copy) @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.plugins.blob; + +import static org.hamcrest.CoreMatchers.instanceOf; + +import java.io.ByteArrayInputStream; +import java.util.UUID; + +import junit.framework.Assert; + +import org.apache.jackrabbit.core.data.DataRecord; +import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils; +import org.apache.jackrabbit.oak.plugins.document.blob.ds.DataStoreUtils; +import org.junit.Assume; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Test for SharedDataUtils to test addition, retrieval and deletion of root records. + */ +public class SharedDataStoreUtilsTest { + SharedDataStore dataStore; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + try { + Assume.assumeThat(DataStoreUtils.getBlobStore(), instanceOf(SharedDataStore.class)); + } catch (Exception e) { + Assume.assumeNoException(e); + } + } + + @Test + public void test() throws Exception { + dataStore = DataStoreUtils.getBlobStore(); + String repoId1 = UUID.randomUUID().toString(); + String repoId2 = UUID.randomUUID().toString(); + + // Add repository records + DataRecord repo1 = SharedDataStoreUtils.addRootRecord( + dataStore, + SharedStoreRecordType.REPOSITORY, + new ByteArrayInputStream(new byte[0]), + repoId1); + DataRecord repo2 = SharedDataStoreUtils.addRootRecord( + dataStore, + SharedStoreRecordType.REPOSITORY, + new ByteArrayInputStream(new byte[0]), + repoId2); + + // Add reference records + DataRecord rec1 = SharedDataStoreUtils.addRootRecord( + dataStore, + SharedStoreRecordType.REFERENCES, + new ByteArrayInputStream(new byte[0]), + repoId1); + DataRecord rec2 = SharedDataStoreUtils.addRootRecord( + dataStore, + SharedStoreRecordType.REFERENCES, + new ByteArrayInputStream(new byte[0]), + repoId2); + + Assert.assertEquals( + SharedStoreRecordType.REPOSITORY.getIdFromName(repo1.getIdentifier().toString()), + repoId1); + Assert.assertEquals( + SharedStoreRecordType.REPOSITORY.getIdFromName(repo2.getIdentifier().toString()), + repoId2); + Assert.assertEquals( + SharedStoreRecordType.REFERENCES.getIdFromName(rec1.getIdentifier().toString()), + repoId1); + Assert.assertEquals( + SharedStoreRecordType.REFERENCES.getIdFromName(rec2.getIdentifier().toString()), + repoId2); + + // All the references from registered repositories are available + Assert.assertTrue( + SharedDataStoreUtils.refsNotAvailableFromRepos( + SharedDataStoreUtils.getRootRecords(dataStore, + SharedStoreRecordType.REPOSITORY), + SharedDataStoreUtils.getRootRecords(dataStore, + SharedStoreRecordType.REFERENCES)).isEmpty()); + + // Earliest should be the 1st reference record + Assert.assertEquals( + SharedDataStoreUtils.getEarliestRecord( + SharedDataStoreUtils.getRootRecords(dataStore, + SharedStoreRecordType.REFERENCES)).getIdentifier().toString(), + SharedStoreRecordType.REFERENCES.getNameFromId(repoId1)); + + // Delete references and check back if deleted + SharedDataStoreUtils.deleteRootRecords(dataStore, SharedStoreRecordType.REFERENCES); + Assert.assertTrue(SharedDataStoreUtils.getRootRecords(dataStore, + SharedStoreRecordType.REFERENCES).isEmpty()); + + // Repository ids should still be available + Assert.assertEquals(2, + SharedDataStoreUtils.getRootRecords(dataStore, SharedStoreRecordType.REPOSITORY) + .size()); + } +} Property changes on: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStoreUtilsTest.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/blob/ds/DataStoreUtils.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/blob/ds/DataStoreUtils.java (revision 1615951) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/blob/ds/DataStoreUtils.java (working copy) @@ -24,6 +24,7 @@ import org.apache.jackrabbit.core.data.FileDataStore; import org.apache.jackrabbit.oak.commons.PropertiesUtil; import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore; +import org.apache.jackrabbit.oak.plugins.blob.datastore.OakFileDataStore; import org.apache.jackrabbit.oak.plugins.document.AbstractMongoConnectionTest; import org.junit.Test; @@ -46,8 +47,9 @@ private static final String DS_PROP_PREFIX = "ds."; private static final String BS_PROP_PREFIX = "bs."; + public static long time; public static DataStoreBlobStore getBlobStore() throws Exception { - String className = System.getProperty(DS_CLASS_NAME, FileDataStore.class.getName()); + String className = System.getProperty(DS_CLASS_NAME, OakFileDataStore.class.getName()); DataStore ds = Class.forName(className).asSubclass(DataStore.class).newInstance(); PropertiesUtil.populate(ds, getConfig(), false); ds.init(getHomeDir()); @@ -67,7 +69,8 @@ } private static String getHomeDir() { - return concat(new File(".").getAbsolutePath(), "target/blobstore/" + System.currentTimeMillis()); + return concat(new File(".").getAbsolutePath(), "target/blobstore/" + + (time == 0 ? System.currentTimeMillis() : time)); } @Test Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java (revision 1615951) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java (working copy) @@ -34,6 +34,7 @@ import org.apache.jackrabbit.oak.api.Blob; import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector; import org.apache.jackrabbit.oak.plugins.document.VersionGarbageCollector.VersionGCStats; +import org.apache.jackrabbit.oak.plugins.identifier.ClusterRepositoryInfo; import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore; import org.apache.jackrabbit.oak.spi.commit.CommitInfo; import org.apache.jackrabbit.oak.spi.commit.EmptyHook; @@ -138,6 +139,7 @@ addInlined(); gc(set); } + @Test public void gcVersionDeleteWithInlined() throws Exception { HashSet set = setUp(false); @@ -144,13 +146,15 @@ addInlined(); gc(set); } + private void gc(HashSet set) throws Exception { DocumentNodeStore store = mk.getNodeStore(); + String repoId = ClusterRepositoryInfo.createId(store); MarkSweepGarbageCollector gc = new MarkSweepGarbageCollector( new DocumentBlobReferenceRetriever(store), (GarbageCollectableBlobStore) store.getBlobStore(), MoreExecutors.sameThreadExecutor(), - "./target", 5, true, 0); + "./target", 5, 0, repoId); gc.collectGarbage(); Set existing = iterate(); Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/SharedBlobStoreGCTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/SharedBlobStoreGCTest.java (revision 0) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/SharedBlobStoreGCTest.java (working copy) @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.plugins.document; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import junit.framework.Assert; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.MoreExecutors; + +import org.apache.jackrabbit.oak.api.Blob; +import org.apache.jackrabbit.oak.plugins.blob.BlobGarbageCollector; +import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector; +import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore; +import org.apache.jackrabbit.oak.plugins.blob.SharedStoreRecordType; +import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils; +import org.apache.jackrabbit.oak.plugins.document.VersionGarbageCollector.VersionGCStats; +import org.apache.jackrabbit.oak.plugins.document.blob.ds.DataStoreUtils; +import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore; +import org.apache.jackrabbit.oak.plugins.identifier.ClusterRepositoryInfo; +import org.apache.jackrabbit.oak.spi.blob.BlobStore; +import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore; +import org.apache.jackrabbit.oak.spi.commit.CommitInfo; +import org.apache.jackrabbit.oak.spi.commit.EmptyHook; +import org.apache.jackrabbit.oak.spi.state.NodeBuilder; +import org.apache.jackrabbit.oak.stats.Clock; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Test for gc in a shared data store among hetrogeneous oak node stores. + */ +public class SharedBlobStoreGCTest { + private Cluster cluster1; + private Cluster cluster2; + private Clock clock; + + @Before + public void setUp() throws Exception { + clock = new Clock.Virtual(); + clock.waitUntil(Revision.getCurrentTimestamp()); + DataStoreUtils.time = clock.getTime(); + BlobStore blobeStore1 = DataStoreUtils.getBlobStore(); + DocumentNodeStore ds1 = new DocumentMK.Builder() + .setAsyncDelay(0) + .setDocumentStore(new MemoryDocumentStore()) + .setBlobStore(blobeStore1) + .clock(clock) + .getNodeStore(); + String repoId1 = ClusterRepositoryInfo.createId(ds1); + // Register the unique repository id in the data store + SharedDataStoreUtils.addRootRecord( + ((SharedDataStore) blobeStore1), + SharedStoreRecordType.REPOSITORY, + new ByteArrayInputStream(new byte[0]), + repoId1); + + BlobStore blobeStore2 = DataStoreUtils.getBlobStore(); + DocumentNodeStore ds2 = new DocumentMK.Builder() + .setAsyncDelay(0) + .setDocumentStore(new MemoryDocumentStore()) + .setBlobStore(blobeStore2) + .clock(clock) + .getNodeStore(); + String repoId2 = ClusterRepositoryInfo.createId(ds2); + // Register the unique repository id in the data store + SharedDataStoreUtils.addRootRecord( + ((SharedDataStore) blobeStore2), + SharedStoreRecordType.REPOSITORY, + new ByteArrayInputStream(new byte[0]), + repoId2); + + cluster1 = new Cluster(ds1, repoId1, 20); + cluster1.init(); + cluster2 = new Cluster(ds2, repoId2, 100); + cluster2.init(); + } + + static InputStream randomStream(int seed, int size) { + Random r = new Random(seed); + byte[] data = new byte[size]; + r.nextBytes(data); + return new ByteArrayInputStream(data); + } + + @Test + public void testGC() throws Exception { + // Only run the mark phase on both the clusters + cluster1.gc.collectGarbage(true); + cluster2.gc.collectGarbage(true); + + // Execute the gc with sweep + cluster1.gc.collectGarbage(false); + + Assert.assertEquals(true, Sets.symmetricDifference( + Sets.union(cluster1.getInitBlobs(), cluster2.getInitBlobs()), + cluster1.getExistingBlobIds()).isEmpty()); + } + + @After + public void tearDown() { + DataStoreUtils.time = 0; + } + + class Cluster { + private DocumentNodeStore ds; + private int seed; + private BlobGarbageCollector gc; + + private Set initBlobs = new HashSet(); + + protected Set getInitBlobs() { + return initBlobs; + } + + public Cluster(final DocumentNodeStore ds, final String repoId, int seed) + throws IOException { + this.ds = ds; + this.gc = new BlobGarbageCollector() { + @Override + public void collectGarbage() throws Exception { + collectGarbage(false); + } + + @Override + public void collectGarbage(boolean markOnly) throws Exception { + MarkSweepGarbageCollector gc = new MarkSweepGarbageCollector( + new DocumentBlobReferenceRetriever(ds), + (GarbageCollectableBlobStore) ds.getBlobStore(), + MoreExecutors.sameThreadExecutor(), + "./target", 5, 0, repoId); + gc.collectGarbage(markOnly); + } + }; + + this.seed = seed; + } + + /** + * Creates the setup load with deletions. + * + * @throws Exception + */ + public void init() throws Exception { + NodeBuilder a = ds.getRoot().builder(); + + int number = 10; + // track the number of the assets to be deleted + List deletes = Lists.newArrayList(); + Random rand = new Random(47); + for (int i = 0; i < 5; i++) { + int n = rand.nextInt(number); + if (!deletes.contains(n)) { + deletes.add(n); + } + } + for (int i = 0; i < number; i++) { + Blob b = ds.createBlob(randomStream(i + seed, 4160)); + if (!deletes.contains(i)) { + Iterator idIter = + ((GarbageCollectableBlobStore) ds.getBlobStore()) + .resolveChunks(b.toString()); + while (idIter.hasNext()) { + initBlobs.add(idIter.next()); + } + } + a.child("c" + i).setProperty("x", b); + } + ds.merge(a, EmptyHook.INSTANCE, CommitInfo.EMPTY); + + + a = ds.getRoot().builder(); + for (int id : deletes) { + a.child("c" + id).remove(); + ds.merge(a, EmptyHook.INSTANCE, CommitInfo.EMPTY); + } + long maxAge = 10; // hours + // 1. Go past GC age and check no GC done as nothing deleted + clock.waitUntil(clock.getTime() + TimeUnit.MINUTES.toMillis(maxAge)); + + VersionGarbageCollector vGC = ds.getVersionGarbageCollector(); + VersionGCStats stats = vGC.gc(0, TimeUnit.MILLISECONDS); + Assert.assertEquals(deletes.size(), stats.deletedDocGCCount); + } + + public Set getExistingBlobIds() throws Exception { + GarbageCollectableBlobStore store = (GarbageCollectableBlobStore) ds.getBlobStore(); + Iterator cur = store.getAllChunkIds(0); + + Set existing = Sets.newHashSet(); + while (cur.hasNext()) { + existing.add(cur.next()); + } + return existing; + } + } +} Property changes on: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/SharedBlobStoreGCTest.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java (revision 1615951) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java (working copy) @@ -40,6 +40,7 @@ import org.apache.jackrabbit.oak.api.CommitFailedException; import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector; import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore; +import org.apache.jackrabbit.oak.plugins.identifier.ClusterRepositoryInfo; import org.apache.jackrabbit.oak.plugins.segment.file.FileStore; import org.apache.jackrabbit.oak.spi.blob.BlobStore; import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore; @@ -121,12 +122,13 @@ @Test public void gc() throws Exception { HashSet set = setUp(); + String repoId = ClusterRepositoryInfo.createId(nodeStore); MarkSweepGarbageCollector gc = new MarkSweepGarbageCollector( new SegmentBlobReferenceRetriever(store.getTracker()), (GarbageCollectableBlobStore) store.getBlobStore(), MoreExecutors.sameThreadExecutor(), - "./target", 2048, true, 0); + "./target", 2048, 0, repoId); gc.collectGarbage(); Set existing = iterate();