Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobGarbageCollector.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobGarbageCollector.java (revision 1615951) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobGarbageCollector.java (working copy) @@ -27,4 +27,13 @@ * @throws Exception */ void collectGarbage() throws Exception; -} \ No newline at end of file + + /** + * Marks garbage blobs from the passed node store instance. + * Collects them only if markOnly is false. + * + * @param markOnly whether to run mark only phase + * @throws Exception the exception + */ + void collectGarbage(boolean markOnly) throws Exception; +} Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStore.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStore.java (revision 1615951) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStore.java (working copy) @@ -19,6 +19,9 @@ package org.apache.jackrabbit.oak.plugins.blob.datastore; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.collect.Iterators.filter; +import static com.google.common.collect.Iterators.transform; import java.io.ByteArrayInputStream; import java.io.File; import java.io.FileInputStream; @@ -48,14 +51,12 @@ import org.apache.jackrabbit.core.data.DataStoreException; import org.apache.jackrabbit.core.data.MultiDataStoreAware; import org.apache.jackrabbit.oak.cache.CacheLIRS; +import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore; import org.apache.jackrabbit.oak.spi.blob.BlobStore; import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.collect.Iterators.filter; -import static com.google.common.collect.Iterators.transform; /** * BlobStore wrapper for DataStore. Wraps Jackrabbit 2 DataStore and expose them as BlobStores @@ -62,7 +63,8 @@ * It also handles inlining binaries if there size is smaller than * {@link org.apache.jackrabbit.core.data.DataStore#getMinRecordLength()} */ -public class DataStoreBlobStore implements DataStore, BlobStore, GarbageCollectableBlobStore { +public class DataStoreBlobStore implements DataStore, SharedDataStore, BlobStore, + GarbageCollectableBlobStore { private final Logger log = LoggerFactory.getLogger(getClass()); private final DataStore delegate; @@ -541,4 +543,43 @@ return new BlobId(dr); } } + + @Override + public DataRecord addRootRecord(InputStream stream, String name) throws DataStoreException { + if (delegate instanceof SharedDataStore) { + return ((SharedDataStore) delegate).addRootRecord(stream, name); + } + return null; + } + + @Override + public List getAllRootRecords(String prefix) { + if (delegate instanceof SharedDataStore) { + return ((SharedDataStore) delegate).getAllRootRecords(prefix); + } + return null; + } + + @Override + public boolean deleteRootRecord(String name) { + if (delegate instanceof SharedDataStore) { + return ((SharedDataStore) delegate).deleteRootRecord(name); + } + return false; + } + + @Override + public void deleteAllRootRecords(String prefix) { + if (delegate instanceof SharedDataStore) { + ((SharedDataStore) delegate).deleteAllRootRecords(prefix); + } + } + + @Override + public Type getType() { + if (delegate instanceof SharedDataStore) { + return Type.SHARED; + } + return Type.DEFAULT; + } } Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/OakFileDataStore.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/OakFileDataStore.java (revision 1615951) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/OakFileDataStore.java (working copy) @@ -20,10 +20,15 @@ package org.apache.jackrabbit.oak.plugins.blob.datastore; import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; import java.lang.ref.WeakReference; import java.util.AbstractMap; +import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; +import java.util.List; import java.util.Set; import com.google.common.base.Charsets; @@ -30,16 +35,27 @@ import com.google.common.base.Function; import com.google.common.base.Predicate; import com.google.common.io.BaseEncoding; +import com.google.common.io.Closeables; import com.google.common.io.Files; + +import org.apache.commons.io.FilenameUtils; +import org.apache.commons.io.IOUtils; +import org.apache.commons.io.filefilter.FileFilterUtils; import org.apache.jackrabbit.core.data.DataIdentifier; +import org.apache.jackrabbit.core.data.DataRecord; import org.apache.jackrabbit.core.data.DataStoreException; +import org.apache.jackrabbit.core.data.FileDataRecord; import org.apache.jackrabbit.core.data.FileDataStore; +import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Oak specific extension of JR2 FileDataStore which enables * provisioning the signing key via OSGi config */ -public class OakFileDataStore extends FileDataStore { +public class OakFileDataStore extends FileDataStore implements SharedDataStore { + public static final Logger LOG = LoggerFactory.getLogger(OakFileDataStore.class); private byte[] referenceKey; public OakFileDataStore() { @@ -51,11 +67,12 @@ @Override public Iterator getAllIdentifiers() { + final String path = FilenameUtils.normalizeNoEndSeparator(getPath()); return Files.fileTreeTraverser().postOrderTraversal(new File(getPath())) .filter(new Predicate() { @Override public boolean apply(File input) { - return input.isFile(); + return input.isFile() && !input.getParent().equals(path); } }) .transform(new Function() { @@ -117,4 +134,82 @@ return Collections.emptySet(); } } + + @Override + public DataRecord addRootRecord(InputStream input, String name) + throws DataStoreException { + try { + File file = new File(getPath(), name); + FileOutputStream os = new FileOutputStream(file); + try { + IOUtils.copyLarge(input, os); + } finally { + Closeables.close(os, true); + Closeables.close(input, true); + } + + return new FileDataRecord(this, + new DataIdentifier(name), + file); + } catch (IOException e) { + LOG.error("Exception while adding root record with name {}, {}", + new Object[] {name, e}); + throw new DataStoreException("Could not add root record", e); + } + } + + @Override + public List getAllRootRecords(String prefix) { + File root = new File(getPath()); + List rootRecords = new ArrayList(); + for (File file : FileFilterUtils.filterList( + FileFilterUtils.prefixFileFilter(prefix), + root.listFiles())) { + if (!file.isDirectory()) { // skip directories which are actual data store files + rootRecords.add( + new FileDataRecord(this, new DataIdentifier(file.getName()), file)); + } + } + return rootRecords; + } + + @Override + public boolean deleteRootRecord(String name) { + File root = new File(getPath()); + + for (File file : FileFilterUtils.filterList( + FileFilterUtils.nameFileFilter(name), + root.listFiles())) { + if (!file.isDirectory()) { // skip directories which are actual data store files + if (!file.delete()) { + LOG.warn("Failed to delete root record {} ", new Object[] {file + .getAbsolutePath()}); + } else { + return true; + } + } + } + return false; + } + + @Override + public void deleteAllRootRecords(String prefix) { + File root = new File(getPath()); + + for (File file : FileFilterUtils.filterList( + FileFilterUtils.prefixFileFilter(prefix), + root.listFiles())) { + if (!file.isDirectory()) { // skip directories which are actual data store files + if (!file.delete()) { + LOG.warn("Failed to delete root record {} ", new Object[] {file + .getAbsolutePath()}); + } + } + } + } + + @Override + public Type getType() { + return Type.DEFAULT; + } } Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedDataStoreUtils.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedDataStoreUtils.java (revision 0) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedDataStoreUtils.java (working copy) @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.blob.datastore; + +import java.io.InputStream; +import java.util.List; +import java.util.Set; + +import javax.annotation.Nullable; + +import com.google.common.base.Function; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.Ordering; +import com.google.common.collect.Sets; + +import org.apache.jackrabbit.core.data.DataRecord; +import org.apache.jackrabbit.core.data.DataStoreException; +import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore; +import org.apache.jackrabbit.oak.plugins.blob.SharedStoreRecordType; + +/** + * Utility class for {@link SharedDataStore}. + */ +public class SharedDataStoreUtils { + + /** + * Add a root record. + * + * @param dataStore the data store where record is to be added + * @param type the type of record + * @param stream the record stream + * @param id the id of the record + * @return the data record + * @throws DataStoreException the data store exception + */ + public static DataRecord addRootRecord(SharedDataStore dataStore, SharedStoreRecordType type, + InputStream stream, String id) + throws DataStoreException { + return dataStore.addRootRecord(stream, type.getNameFromId(id)); + } + + /** + * Gets the root records with the given prefix. + * + * @param dataStore the data store + * @param record the record + * @return the root records + * @throws DataStoreException the data store exception + */ + public static List getRootRecords(SharedDataStore dataStore, + SharedStoreRecordType record) + throws DataStoreException { + return dataStore.getAllRootRecords(record.getType()); + } + + /** + * Delete root records. + * + * @param dataStore the data store + * @param type the type + */ + public static void deleteRootRecords(SharedDataStore dataStore, SharedStoreRecordType type) { + dataStore.deleteAllRootRecords(type.getType()); + } + + /** + * Gets the earliest record of the available reference records. + * + * @param recs the recs + * @return the earliest record + */ + public static DataRecord getEarliestRecord(List recs) { + return Ordering.natural().onResultOf( + new Function() { + @Override + @Nullable + public Long apply(@Nullable DataRecord input) { + return input.getLastModified(); + } + }).min(recs); + } + + /** + * Repositories from which marked references not available. + * + * @param repos the repos + * @param refs the refs + * @return the sets the sets whose references not available + */ + public static Set refsNotAvailableFromRepos(List repos, + List refs) { + return Sets.difference(FluentIterable.from(repos).uniqueIndex( + new Function() { + @Override + @Nullable + public String apply(@Nullable DataRecord input) { + return SharedStoreRecordType.REPOSITORY.getIdFromName(input + .getIdentifier() + .toString()); + } + }).keySet(), + FluentIterable.from(refs).uniqueIndex( + new Function() { + @Override + @Nullable + public String apply(@Nullable DataRecord input) { + return SharedStoreRecordType.REFERENCES.getIdFromName(input + .getIdentifier() + .toString()); + } + }).keySet()); + } +} Property changes on: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedDataStoreUtils.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java (revision 1615951) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java (working copy) @@ -18,12 +18,16 @@ import java.io.Closeable; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; +import java.io.InputStream; import java.util.Comparator; +import java.util.List; import com.google.common.io.Files; import org.apache.commons.io.FileUtils; +import org.apache.jackrabbit.oak.commons.IOUtils; import org.apache.jackrabbit.oak.commons.sort.ExternalSort; /** @@ -47,6 +51,13 @@ /** The garbage stores the garbage collection candidates which were not deleted . */ private final File garbage; + private final static Comparator lexComparator = + new Comparator() { + @Override + public int compare(String s1, String s2) { + return s1.compareTo(s2); + } + }; /** * Instantiates a new garbage collector file state. @@ -118,21 +129,26 @@ * * @param file file whose contents needs to be sorted */ - public void sort(File file) throws IOException { + public static void sort(File file) throws IOException { File sorted = createTempFile(); - Comparator lexComparator = new Comparator() { - @Override - public int compare(String s1, String s2) { - return s1.compareTo(s2); - } - }; - ExternalSort.mergeSortedFiles( - ExternalSort.sortInBatch(file, lexComparator, true), - sorted, lexComparator, true); + merge(ExternalSort.sortInBatch(file, lexComparator, true), sorted); Files.move(sorted, file); } + + public static void merge(List files, File output) throws IOException { + ExternalSort.mergeSortedFiles( + files, + output, lexComparator, true); + } + + public static File copy(InputStream stream) throws IOException { + File file = createTempFile(); + IOUtils.copy(stream, + new FileOutputStream(file)); + return file; + } - private File createTempFile() throws IOException { - return File.createTempFile("temp", null, home); + private static File createTempFile() throws IOException { + return File.createTempFile("temp", null); } } \ No newline at end of file Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java (revision 1615951) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java (working copy) @@ -19,14 +19,16 @@ import java.io.BufferedWriter; import java.io.Closeable; import java.io.File; +import java.io.FileInputStream; import java.io.FileWriter; import java.io.IOException; +import java.io.InputStream; import java.sql.Timestamp; import java.util.Iterator; import java.util.List; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -41,11 +43,14 @@ import com.google.common.collect.PeekingIterator; import com.google.common.io.Closeables; import com.google.common.io.Files; -import com.google.common.util.concurrent.ListenableFutureTask; -import com.google.common.util.concurrent.MoreExecutors; + import org.apache.commons.io.FileUtils; import org.apache.commons.io.LineIterator; +import org.apache.jackrabbit.core.data.DataRecord; +import org.apache.jackrabbit.core.data.DataStoreException; import org.apache.jackrabbit.oak.commons.IOUtils; +import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore.Type; +import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils; import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,14 +72,13 @@ public static final int DEFAULT_BATCH_COUNT = 2048; - public static enum State {NOT_RUNNING, MARKING, SWEEPING} + public static enum State { + NOT_RUNNING, MARKING, SWEEPING + } /** The last modified time before current time of blobs to consider for garbage collection. */ private final long maxLastModifiedInterval; - /** Run concurrently when possible. */ - private final boolean runConcurrently; - /** The blob store to be garbage collected. */ private final GarbageCollectableBlobStore blobStore; @@ -89,6 +93,8 @@ /** The batch count. */ private final int batchCount; + private String repoId; + /** Flag to indicate the state of the gc **/ private State state = State.NOT_RUNNING; @@ -110,16 +116,16 @@ Executor executor, String root, int batchCount, - boolean runBackendConcurrently, - long maxLastModifiedInterval) + long maxLastModifiedInterval, + String repositoryId) throws IOException { this.executor = executor; this.blobStore = blobStore; this.marker = marker; this.batchCount = batchCount; - this.runConcurrently = runBackendConcurrently; this.maxLastModifiedInterval = maxLastModifiedInterval; - fs = new GarbageCollectorFileState(root); + this.repoId = repositoryId; + fs = new GarbageCollectorFileState(root); } /** @@ -126,11 +132,13 @@ * Instantiates a new blob garbage collector. */ public MarkSweepGarbageCollector( - BlobReferenceRetriever marker, + BlobReferenceRetriever marker, GarbageCollectableBlobStore blobStore, - Executor executor) + Executor executor, + String repositoryId) throws IOException { - this(marker, blobStore, executor, TEMP_DIR, DEFAULT_BATCH_COUNT, true, TimeUnit.HOURS.toMillis(24)); + this(marker, blobStore, executor, TEMP_DIR, DEFAULT_BATCH_COUNT, TimeUnit.HOURS + .toMillis(24), repositoryId); } public MarkSweepGarbageCollector( @@ -137,16 +145,23 @@ BlobReferenceRetriever marker, GarbageCollectableBlobStore blobStore, Executor executor, - long maxLastModifiedInterval) + long maxLastModifiedInterval, + String repositoryId) throws IOException { - this(marker, blobStore, executor, TEMP_DIR, DEFAULT_BATCH_COUNT, true, maxLastModifiedInterval); + this(marker, blobStore, executor, TEMP_DIR, DEFAULT_BATCH_COUNT, + maxLastModifiedInterval, repositoryId); } @Override public void collectGarbage() throws Exception { - markAndSweep(); + markAndSweep(false); } + @Override + public void collectGarbage(boolean markOnly) throws Exception { + markAndSweep(markOnly); + } + /** * Gets the state of the gc process. * @@ -158,8 +173,11 @@ /** * Mark and sweep. Main method for GC. + * + * @param markOnly whether to mark only + * @throws Exception the exception */ - private void markAndSweep() throws IOException, InterruptedException { + private void markAndSweep(boolean markOnly) throws Exception { boolean threw = true; try { Stopwatch sw = Stopwatch.createStarted(); @@ -166,13 +184,17 @@ LOG.info("Starting Blob garbage collection"); mark(); - int deleteCount = sweep(); - threw = false; + if (!markOnly) { + int deleteCount = sweep(); + threw = false; - LOG.info("Blob garbage collection completed in {}. Number of blobs " + - "deleted [{}]", sw.toString(), deleteCount); + LOG.info("Blob garbage collection completed in {}. Number of blobs " + + "deleted [{}]", sw.toString(), deleteCount); + } } finally { - Closeables.close(fs, threw); + if (LOG.isTraceEnabled()) { + Closeables.close(fs, threw); + } state = State.NOT_RUNNING; } } @@ -180,29 +202,15 @@ /** * Mark phase of the GC. */ - private void mark() throws IOException, InterruptedException { + private void mark() throws IOException, DataStoreException { state = State.MARKING; LOG.debug("Starting mark phase of the garbage collector"); - // Find all blobs available in the blob store - ListenableFutureTask blobIdRetriever = ListenableFutureTask.create(new BlobIdRetriever()); - if (runConcurrently) { - executor.execute(blobIdRetriever); - } else { - MoreExecutors.sameThreadExecutor().execute(blobIdRetriever); - } - - // Find all blob references after iterating over the whole repository + // Mark all used references iterateNodeTree(); - try { - blobIdRetriever.get(); - } catch (ExecutionException e) { - LOG.warn("Error occurred while fetching all the blobIds from the BlobStore. GC would " + - "continue with the blobIds retrieved so far", e.getCause()); - } - - difference(); + // Move the marked references file if applicable + GarbageCollectionType.get(blobStore).addMarked(blobStore, fs, repoId); LOG.debug("Ending mark phase of the garbage collector"); } @@ -248,11 +256,33 @@ /** * Sweep phase of gc candidate deletion. - * - * @throws IOException - * Signals that an I/O exception has occurred. + *

+ * Performs the following steps: + *

    + *
  • Merge all marked references available. + *
  • Retrieve all blob ids available. + *
  • Diffs the 2 sets above to retrieve list of blob ids not used. + *
  • Deletes the above set. + *
+ * + * @return the number of blobs deleted + * @throws Exception the exception */ - private int sweep() throws IOException { + private int sweep() throws Exception { + long earliestRefAvailTime = System.currentTimeMillis(); + // Only go ahead if merge succeeded + try { + earliestRefAvailTime = + GarbageCollectionType.get(blobStore).mergeAllMarkedReferences(blobStore, fs); + } catch (Exception e) { + return 0; + } + + // Find all blob references after iterating over the whole repository + (new BlobIdRetriever()).call(); + + // Calculate the references not used + difference(); int count = 0; state = State.SWEEPING; LOG.debug("Starting sweep phase of the garbage collector"); @@ -268,13 +298,13 @@ if (ids.size() > getBatchCount()) { count += ids.size(); - executor.execute(new Sweeper(ids, exceptionQueue)); + executor.execute(new Sweeper(ids, exceptionQueue, earliestRefAvailTime)); ids = Lists.newArrayList(); } } if (!ids.isEmpty()) { count += ids.size(); - executor.execute(new Sweeper(ids, exceptionQueue)); + executor.execute(new Sweeper(ids, exceptionQueue, earliestRefAvailTime)); } count -= exceptionQueue.size(); @@ -292,6 +322,8 @@ LOG.warn("Unable to delete some blob entries from the blob store. Details around such blob entries " + "can be found in [{}]", fs.getGarbage().getAbsolutePath()); } + // Remove all the merged marked references + GarbageCollectionType.get(blobStore).removeAllMarkedReferences(blobStore); LOG.debug("Ending sweep phase of the garbage collector"); return count; } @@ -300,10 +332,10 @@ return batchCount; } - private long getLastMaxModifiedTime(){ + private long getLastMaxModifiedTime(long maxModified) { return maxLastModifiedInterval > 0 ? - System.currentTimeMillis() - maxLastModifiedInterval : 0; - + (maxModified <= 0 ? System.currentTimeMillis() : maxModified) + - maxLastModifiedInterval : 0; } /** @@ -327,9 +359,13 @@ /** The ids to sweep. */ private final List ids; - public Sweeper(List ids, ConcurrentLinkedQueue exceptionQueue) { + private final long maxModified; + + public Sweeper(List ids, ConcurrentLinkedQueue exceptionQueue, + long maxModified) { this.exceptionQueue = exceptionQueue; this.ids = ids; + this.maxModified = maxModified; } @Override @@ -336,7 +372,7 @@ public void run() { try { LOG.debug("Blob ids to be deleted {}", ids); - boolean deleted = blobStore.deleteChunks(ids,getLastMaxModifiedTime()); + boolean deleted = blobStore.deleteChunks(ids, getLastMaxModifiedTime(maxModified)); if (!deleted) { exceptionQueue.addAll(ids); } @@ -363,7 +399,7 @@ @Override public void addReference(String blobId) { if (debugMode) { - LOG.trace("BlobId : {}",blobId); + LOG.trace("BlobId : {}", blobId); } try { @@ -378,7 +414,7 @@ } if (debugMode) { - LOG.trace("chunkId : {}",id); + LOG.trace("chunkId : {}", id); } count.getAndIncrement(); } @@ -392,11 +428,11 @@ } } } - ); + ); LOG.info("Number of valid blob references marked under mark phase of " + - "Blob garbage collection [{}]",count.get()); + "Blob garbage collection [{}]", count.get()); // sort the marked references - fs.sort(fs.getMarkedRefs()); + GarbageCollectorFileState.sort(fs.getMarkedRefs()); } finally { IOUtils.closeQuietly(writer); } @@ -408,7 +444,7 @@ */ private class BlobIdRetriever implements Callable { @Override - public Integer call() throws Exception { + public Integer call() throws Exception { LOG.debug("Starting retrieve of all blobs"); BufferedWriter bufferWriter = null; int blobsCount = 0; @@ -415,7 +451,7 @@ try { bufferWriter = new BufferedWriter( new FileWriter(fs.getAvailableRefs())); - Iterator idsIter = blobStore.getAllChunkIds(getLastMaxModifiedTime()); + Iterator idsIter = blobStore.getAllChunkIds(0); List ids = Lists.newArrayList(); while (idsIter.hasNext()) { @@ -433,16 +469,14 @@ } // sort the file - fs.sort(fs.getAvailableRefs()); + GarbageCollectorFileState.sort(fs.getAvailableRefs()); LOG.debug("Number of blobs present in BlobStore : [{}] which have " + - "been last modified before [{}]", blobsCount, timestampToString(getLastMaxModifiedTime())); + "been last modified before [{}]", blobsCount, timestampToString(getLastMaxModifiedTime(0))); } finally { IOUtils.closeQuietly(bufferWriter); } return blobsCount; } - - } @@ -449,7 +483,7 @@ /** * FileLineDifferenceIterator class which iterates over the difference of 2 files line by line. */ - static class FileLineDifferenceIterator extends AbstractIterator implements Closeable{ + static class FileLineDifferenceIterator extends AbstractIterator implements Closeable { private final PeekingIterator peekMarked; private final LineIterator marked; private final LineIterator all; @@ -522,4 +556,110 @@ private static String timestampToString(long timestamp){ return (new Timestamp(timestamp) + "00").substring(0, 23); } + + /** + * Enum that defines different data store types and the encodes the divergent behavior. + */ + enum GarbageCollectionType { + SHARED { + /** + * Remove the maked references from the blob store root. Default noop. + * + * @param blobStore + */ + @Override + void removeAllMarkedReferences(GarbageCollectableBlobStore blobStore) { + SharedDataStoreUtils.deleteRootRecords((SharedDataStore) blobStore, + SharedStoreRecordType.REFERENCES); + } + + /** + * Merge all marked references available and return the earliest time of the references. + * + * @param blobStore the blob store + * @param fs the fs + * @return the long the earliest time of the available references + * @throws IOException Signals that an I/O exception has occurred. + * @throws DataStoreException the data store exception + */ + @Override + long mergeAllMarkedReferences(GarbageCollectableBlobStore blobStore, + GarbageCollectorFileState fs) + throws IOException, DataStoreException { + + List refFiles = + SharedDataStoreUtils + .getRootRecords((SharedDataStore) blobStore, + SharedStoreRecordType.REFERENCES); + // Get all the repositories registered + List repoFiles = + SharedDataStoreUtils + .getRootRecords((SharedDataStore) blobStore, + SharedStoreRecordType.REPOSITORY); + + // Retrieve repos for which reference files have not been created + Set unAvailRepos = + SharedDataStoreUtils.refsNotAvailableFromRepos(repoFiles, refFiles); + if (unAvailRepos.isEmpty()) { + // List of files to be merged + List files = Lists.newArrayList(); + for (DataRecord refFile : refFiles) { + File file = GarbageCollectorFileState.copy(refFile.getStream()); + GarbageCollectorFileState.sort(file); + files.add(file); + } + + GarbageCollectorFileState.merge(files, fs.getMarkedRefs()); + + return SharedDataStoreUtils.getEarliestRecord(refFiles).getLastModified(); + } else { + LOG.error("Not all repositories have marked references available : {}", + unAvailRepos); + throw new IOException( + "Not all repositories have marked references available"); + } + } + + /** + * Adds the marked references to the blob store root. Default noop + * + * @param blobStore the blob store + * @param fs the fs + * @param repoId the repo id + * @throws DataStoreException the data store exception + * @throws IOException Signals that an I/O exception has occurred. + */ + @Override + void addMarked(GarbageCollectableBlobStore blobStore, GarbageCollectorFileState fs, + String repoId) throws DataStoreException, IOException { + InputStream is = new FileInputStream(fs.getMarkedRefs()); + try { + SharedDataStoreUtils.addRootRecord((SharedDataStore) blobStore, + SharedStoreRecordType.REFERENCES, is, repoId); + } finally { + Closeables.close(is, false); + } + } + }, + DEFAULT; + + void removeAllMarkedReferences(GarbageCollectableBlobStore blobStore) {} + + void addMarked(GarbageCollectableBlobStore blobStore, GarbageCollectorFileState fs, + String repoId) throws DataStoreException, IOException {} + + long mergeAllMarkedReferences(GarbageCollectableBlobStore blobStore, + GarbageCollectorFileState fs) + throws IOException, DataStoreException { + return System.currentTimeMillis(); + } + + public static GarbageCollectionType get(GarbageCollectableBlobStore blobStore) { + if (blobStore instanceof SharedDataStore + && ((SharedDataStore) blobStore).getType() == Type.SHARED) { + return SHARED; + } + return DEFAULT; + } + } } Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStore.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStore.java (revision 0) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStore.java (working copy) @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.blob; + +import java.io.InputStream; +import java.util.List; + +import org.apache.jackrabbit.core.data.DataRecord; +import org.apache.jackrabbit.core.data.DataStoreException; + +/** + * Interface to be implemented by a shared data store. + */ +public interface SharedDataStore { + /** + * Explicitly identifies the type of the data store + */ + enum Type { + SHARED, DEFAULT; + } + + /** + * Adds the root record. + * + * @param stream the stream + * @param name the name of the root record + * @return the data record + * @throws DataStoreException the data store exception + */ + DataRecord addRootRecord(InputStream stream, String name) + throws DataStoreException; + + /** + * Gets the all root records. + * + * @return the all root records + */ + List getAllRootRecords(String prefix); + + /** + * Deletes the root record represented by the given parameters. + * + * @param name the name of the root record + * @return success/failure + */ + boolean deleteRootRecord(String name); + + /** + * Deletes all records matching the given prefix. + * + * @param prefix + */ + void deleteAllRootRecords(String prefix); + + /** + * Gets the type. + * + * @return the type + */ + Type getType(); +} Property changes on: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStore.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/SharedStoreRecordType.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/SharedStoreRecordType.java (revision 0) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/SharedStoreRecordType.java (working copy) @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.blob; + +import com.google.common.base.Joiner; +import com.google.common.base.Splitter; + +/** + * Encapsulates the different type of records at the data store root. + */ +public enum SharedStoreRecordType { + REFERENCES("references") { + @Override + public String getNameFromId(String id) { + return Joiner.on(DELIIM).join(getType(), id); + } + }, + REPOSITORY("repository") { + @Override + public String getNameFromId(String id) { + return Joiner.on(DELIIM).join(getType(), id); + } + }; + + private final String type; + + SharedStoreRecordType(String type) { + this.type = type; + } + + public String getType() + { + return type; + } + + public String getIdFromName(String name) { + return Splitter.on(DELIIM).limit(2).splitToList(name).get(1); + } + + public abstract String getNameFromId(String id); + + static final String DELIIM = "-"; +} Property changes on: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/SharedStoreRecordType.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (revision 1615951) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (working copy) @@ -1700,7 +1700,8 @@ * @param blobGcMaxAgeInSecs */ @CheckForNull - public MarkSweepGarbageCollector createBlobGarbageCollector(long blobGcMaxAgeInSecs) { + public MarkSweepGarbageCollector createBlobGarbageCollector(long blobGcMaxAgeInSecs, + String repositoryId) { MarkSweepGarbageCollector blobGC = null; if(blobStore instanceof GarbageCollectableBlobStore){ try { @@ -1708,7 +1709,8 @@ new DocumentBlobReferenceRetriever(this), (GarbageCollectableBlobStore) blobStore, executor, - TimeUnit.SECONDS.toMillis(blobGcMaxAgeInSecs)); + TimeUnit.SECONDS.toMillis(blobGcMaxAgeInSecs), + repositoryId); } catch (IOException e) { throw new RuntimeException("Error occurred while initializing " + "the MarkSweepGarbageCollector",e); Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java (revision 1615951) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java (working copy) @@ -18,6 +18,7 @@ */ package org.apache.jackrabbit.oak.plugins.document; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.util.ArrayList; import java.util.Dictionary; @@ -41,6 +42,8 @@ import org.apache.felix.scr.annotations.Reference; import org.apache.felix.scr.annotations.ReferenceCardinality; import org.apache.felix.scr.annotations.ReferencePolicy; +import org.apache.jackrabbit.core.data.DataStoreException; +import org.apache.jackrabbit.oak.api.CommitFailedException; import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean; import org.apache.jackrabbit.oak.commons.PropertiesUtil; import org.apache.jackrabbit.oak.kernel.KernelNodeStore; @@ -49,9 +52,14 @@ import org.apache.jackrabbit.oak.plugins.blob.BlobGC; import org.apache.jackrabbit.oak.plugins.blob.BlobGCMBean; import org.apache.jackrabbit.oak.plugins.blob.BlobGarbageCollector; +import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore; +import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore.Type; +import org.apache.jackrabbit.oak.plugins.blob.SharedStoreRecordType; +import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils; import org.apache.jackrabbit.oak.plugins.document.cache.CachingDocumentStore; import org.apache.jackrabbit.oak.plugins.document.rdb.RDBDataSourceFactory; import org.apache.jackrabbit.oak.plugins.document.util.MongoConnection; +import org.apache.jackrabbit.oak.plugins.identifier.ClusterRepositoryInfo; import org.apache.jackrabbit.oak.spi.blob.BlobStore; import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore; import org.apache.jackrabbit.oak.spi.state.NodeStore; @@ -319,7 +327,27 @@ mkBuilder.setExecutor(executor); mk = mkBuilder.open(); + try { + String repoId = ClusterRepositoryInfo.createId(mk.getNodeStore()); + if (blobStore != null + && blobStore instanceof SharedDataStore + && ((SharedDataStore) blobStore).getType() == Type.SHARED) { + try { + SharedDataStoreUtils + .addRootRecord( + ((SharedDataStore) blobStore), + SharedStoreRecordType.REPOSITORY, + new ByteArrayInputStream(new byte[0]), + repoId); + } catch (DataStoreException e) { + throw new IOException(e); + } + } + } catch (CommitFailedException e1) { + throw new IOException("Could not register a unique repositoryId"); + } + registerJMXBeans(mk.getNodeStore()); registerLastRevRecoveryJob(mk.getNodeStore()); @@ -463,10 +491,18 @@ BlobGarbageCollector gc = new BlobGarbageCollector() { @Override public void collectGarbage() throws Exception { - store.createBlobGarbageCollector(blobGcMaxAgeInSecs).collectGarbage(); + collectGarbage(false); } + + @Override + public void collectGarbage(boolean sweep) throws Exception { + store.createBlobGarbageCollector(blobGcMaxAgeInSecs, + ClusterRepositoryInfo.getId(mk.getNodeStore().getRoot())) + .collectGarbage(sweep); + } }; - registrations.add(registerMBean(whiteboard, BlobGCMBean.class, new BlobGC(gc, executor), + registrations.add(registerMBean(whiteboard, BlobGCMBean.class, + new BlobGC(gc, executor), BlobGCMBean.TYPE, "Document node store blob garbage collection")); } Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/identifier/ClusterRepositoryInfo.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/identifier/ClusterRepositoryInfo.java (revision 0) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/identifier/ClusterRepositoryInfo.java (working copy) @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.identifier; + +import java.util.UUID; + +import org.apache.jackrabbit.oak.api.CommitFailedException; +import org.apache.jackrabbit.oak.api.Type; +import org.apache.jackrabbit.oak.spi.commit.CommitInfo; +import org.apache.jackrabbit.oak.spi.commit.EmptyHook; +import org.apache.jackrabbit.oak.spi.state.NodeBuilder; +import org.apache.jackrabbit.oak.spi.state.NodeState; +import org.apache.jackrabbit.oak.spi.state.NodeStore; + +/** + * Utility class to manage a unique cluster/repository id for the cluster. + */ +public class ClusterRepositoryInfo { + public static final String CLUSTER_CONFIG = ":clusterConfig"; + public static final String CLUSTER_ID = ":clusterId"; + + public static String createId(NodeStore store) throws CommitFailedException { + NodeBuilder root = store.getRoot().builder(); + if (!root.hasChildNode(CLUSTER_CONFIG)) { + String id = UUID.randomUUID().toString(); + root.child(CLUSTER_CONFIG).setProperty(CLUSTER_ID, id); + store.merge(root, EmptyHook.INSTANCE, CommitInfo.EMPTY); + return id; + } else { + return root.getChildNode(CLUSTER_CONFIG).getProperty(CLUSTER_ID).getValue(Type.STRING); + } + } + + public static String getId(NodeState root) { + return root.getChildNode(CLUSTER_CONFIG).getProperty(CLUSTER_ID).getValue(Type.STRING); + } +} Property changes on: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/identifier/ClusterRepositoryInfo.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java (revision 1615951) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java (working copy) @@ -19,6 +19,7 @@ import static com.google.common.base.Preconditions.checkState; import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.registerMBean; +import java.io.ByteArrayInputStream; import java.io.Closeable; import java.io.File; import java.io.IOException; @@ -34,6 +35,8 @@ import org.apache.felix.scr.annotations.Reference; import org.apache.felix.scr.annotations.ReferenceCardinality; import org.apache.felix.scr.annotations.ReferencePolicy; +import org.apache.jackrabbit.core.data.DataStoreException; +import org.apache.jackrabbit.oak.api.CommitFailedException; import org.apache.jackrabbit.oak.osgi.ObserverTracker; import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard; import org.apache.jackrabbit.oak.plugins.blob.BlobGC; @@ -40,6 +43,11 @@ import org.apache.jackrabbit.oak.plugins.blob.BlobGCMBean; import org.apache.jackrabbit.oak.plugins.blob.BlobGarbageCollector; import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector; +import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore; +import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore.Type; +import org.apache.jackrabbit.oak.plugins.blob.SharedStoreRecordType; +import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils; +import org.apache.jackrabbit.oak.plugins.identifier.ClusterRepositoryInfo; import org.apache.jackrabbit.oak.plugins.segment.file.FileStore; import org.apache.jackrabbit.oak.spi.blob.BlobStore; import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore; @@ -187,15 +195,42 @@ revisionGCRegistration = registerMBean(whiteboard, RevisionGCMBean.class, revisionGC, RevisionGCMBean.TYPE, "Segment node store revision garbage collection"); + try { + String repoId = ClusterRepositoryInfo.createId(delegate); + // If a shared data store register the repo id at the data store + if (blobStore != null + && blobStore instanceof SharedDataStore + && ((SharedDataStore) blobStore).getType() == Type.SHARED) { + try { + + SharedDataStoreUtils + .addRootRecord( + ((SharedDataStore) blobStore), + SharedStoreRecordType.REPOSITORY, + new ByteArrayInputStream(new byte[0]), + repoId); + } catch (DataStoreException e) { + throw new IOException("Could not register a unique repositoryId"); + } + } + } catch (CommitFailedException e1) { + throw new IOException(""); + } if (store.getBlobStore() instanceof GarbageCollectableBlobStore) { BlobGarbageCollector gc = new BlobGarbageCollector() { @Override public void collectGarbage() throws Exception { + collectGarbage(false); + } + + @Override + public void collectGarbage(boolean sweep) throws Exception { MarkSweepGarbageCollector gc = new MarkSweepGarbageCollector( new SegmentBlobReferenceRetriever(store.getTracker()), (GarbageCollectableBlobStore) store.getBlobStore(), - executor); - gc.collectGarbage(); + executor, + ClusterRepositoryInfo.getId(delegate.getRoot())); + gc.collectGarbage(sweep); } };