Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobGC.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobGC.java (date 1441792207000) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobGC.java (date 1441797458000) @@ -59,7 +59,11 @@ private final Executor executor; private ManagementOperation gcOp = done(OP_NAME, ""); - + + public static final String CONSISTENCY_OP_NAME = "Blob consistency check"; + + private ManagementOperation consistencyOp = done(CONSISTENCY_OP_NAME, ""); + /** * @param blobGarbageCollector Blob garbage collector * @param executor executor for running the garbage collection task @@ -112,6 +116,23 @@ throw new IllegalStateException(e); } return tds; + } + + @Override + public CompositeData checkConsistency() { + if (consistencyOp.isDone()) { + consistencyOp = newManagementOperation(CONSISTENCY_OP_NAME, new Callable() { + @Override + public String call() throws Exception { + long t0 = nanoTime(); + long missing = blobGarbageCollector.checkConsistency(); + return missing + "missing blobs found (details in the log). Consistency check completed in " + + formatTime(nanoTime() - t0); + } + }); + executor.execute(consistencyOp); + } + return consistencyOp.getStatus().toCompositeData(); } private CompositeDataSupport toCompositeData(GarbageCollectionRepoStats statObj) throws OpenDataException { Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobGCMBean.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobGCMBean.java (date 1441792207000) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobGCMBean.java (date 1441797458000) @@ -61,5 +61,11 @@ * @return List of available repositories and their status */ TabularData getGlobalMarkStats(); - + + /** + * Data Store consistency check + * + * @return the missing blobs + */ + CompositeData checkConsistency(); } Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobGarbageCollector.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobGarbageCollector.java (date 1441792207000) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobGarbageCollector.java (date 1441797458000) @@ -39,4 +39,12 @@ * @throws Exception */ List getStats() throws Exception; + + /** + * Checks for consistency in the blob store and reporting the number of missing blobs. + * + * @return number of inconsistencies + * @throws Exception + */ + long checkConsistency() throws Exception; } Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java (date 1441792207000) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java (date 1441797458000) @@ -134,7 +134,21 @@ merge(ExternalSort.sortInBatch(file, lexComparator, true), sorted); Files.move(sorted, file); } - + + /** + * Sorts the given file externally with the given comparator. + * + * @param file file whose contents needs to be sorted + * @param comparator to compare + * @throws IOException + */ + public static void sort(File file, Comparator comparator) throws IOException { + File sorted = createTempFile(); + merge(ExternalSort.sortInBatch(file, comparator, true), sorted); + Files.move(sorted, file); + } + + public static void merge(List files, File output) throws IOException { ExternalSort.mergeSortedFiles( files, Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java (date 1441792207000) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java (date 1441797458000) @@ -26,12 +26,14 @@ import java.io.InputStreamReader; import java.io.LineNumberReader; import java.sql.Timestamp; +import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -49,6 +51,7 @@ import com.google.common.io.Closeables; import com.google.common.io.Files; +import com.google.common.util.concurrent.ListenableFutureTask; import org.apache.commons.io.FileUtils; import org.apache.commons.io.LineIterator; import org.apache.jackrabbit.core.data.DataRecord; @@ -80,7 +83,9 @@ public static final String TEMP_DIR = StandardSystemProperty.JAVA_IO_TMPDIR.value(); public static final int DEFAULT_BATCH_COUNT = 2048; - + + public static final String DELIM = ","; + /** The last modified time before current time of blobs to consider for garbage collection. */ private final long maxLastModifiedInterval; @@ -265,13 +270,18 @@ FileLineDifferenceIterator iter = new FileLineDifferenceIterator( fs.getMarkedRefs(), fs.getAvailableRefs()); + calculateDifference(fs, iter); + LOG.debug("Ending difference phase of the garbage collector"); + } + + private long calculateDifference(GarbageCollectorFileState fs, FileLineDifferenceIterator iter) throws IOException { + long numCandidates = 0; BufferedWriter bufferWriter = null; try { bufferWriter = Files.newWriter(fs.getGcCandidates(), Charsets.UTF_8); List expiredSet = newArrayList(); - int numCandidates = 0; while (iter.hasNext()) { expiredSet.add(iter.next()); if (expiredSet.size() > getBatchCount()) { @@ -284,15 +294,14 @@ numCandidates += expiredSet.size(); saveBatchToFile(expiredSet, bufferWriter); } - LOG.debug("Found GC candidates - " + numCandidates); + LOG.debug("Found candidates - " + numCandidates); } finally { IOUtils.closeQuietly(bufferWriter); IOUtils.closeQuietly(iter); } - - LOG.debug("Ending difference phase of the garbage collector"); + return numCandidates; } - + /** * Sweep phase of gc candidate deletion. *

@@ -456,17 +465,19 @@ private final boolean debugMode = LOG.isTraceEnabled(); @Override - public void addReference(String blobId) { + public void addReference(String blobId, String nodeId) { if (debugMode) { - LOG.trace("BlobId : {}", blobId); + LOG.trace("BlobId : {}, NodeId : {}", blobId, nodeId); } try { Iterator idIter = blobStore.resolveChunks(blobId); + Joiner delimJoiner = Joiner.on(DELIM).skipNulls(); while (idIter.hasNext()) { String id = idIter.next(); - idBatch.add(id); - + + idBatch.add(delimJoiner.join(id, nodeId)); + if (idBatch.size() >= getBatchCount()) { saveBatchToFile(idBatch, writer); idBatch.clear(); @@ -490,14 +501,65 @@ ); LOG.info("Number of valid blob references marked under mark phase of " + "Blob garbage collection [{}]", count.get()); - // sort the marked references - GarbageCollectorFileState.sort(fs.getMarkedRefs()); + // sort the marked references with the first part of the key + GarbageCollectorFileState.sort(fs.getMarkedRefs(), + new Comparator() { + @Override + public int compare(String s1, String s2) { + return s1.split(DELIM)[0].compareTo(s2.split(DELIM)[0]); + } + }); } finally { IOUtils.closeQuietly(writer); } } - + + /** + * Checks for the DataStore consistency and reports the number of missing blobs still referenced. + * + * @return the missing blobs + * @throws Exception + */ + @Override + public long checkConsistency() throws Exception { + boolean threw = true; + GarbageCollectorFileState fs = new GarbageCollectorFileState(root); + long candidates = 0; - + + try { + Stopwatch sw = Stopwatch.createStarted(); + LOG.info("Starting blob consistency check"); + + // Find all blobs available in the blob store + ListenableFutureTask blobIdRetriever = ListenableFutureTask.create(new BlobIdRetriever(fs)); + executor.execute(blobIdRetriever); + + // Mark all used blob references + iterateNodeTree(fs); + + try { + blobIdRetriever.get(); + } catch (ExecutionException e) { + LOG.warn("Error occurred while fetching all the blobIds from the BlobStore"); + throw e; + } + + LOG.trace("Starting difference phase of the consistency check"); + FileLineDifferenceIterator iter = new FileLineDifferenceIterator(fs.getAvailableRefs(), fs.getMarkedRefs()); + candidates = calculateDifference(fs, iter); + LOG.trace("Ending difference phase of the consistency check"); + + if (candidates > 0) { + LOG.warn("Consistency check failure in the the blob store : {}, check missing candidates in file {}", + blobStore, fs.getGcCandidates().getAbsolutePath()); + } + } finally { + if (!LOG.isTraceEnabled() || candidates == 0) { + Closeables.close(fs, threw); + } + } + return candidates; + } /** * BlobIdRetriever class to retrieve all blob ids. */ @@ -578,7 +640,11 @@ LineIterator.closeQuietly(marked); LineIterator.closeQuietly(all); } - + + private String getKey(String row) { + return row.split(DELIM)[0]; + } + private String computeNextDiff() { if (!all.hasNext()) { return null; @@ -594,7 +660,7 @@ diff = all.next(); while (peekMarked.hasNext()) { String marked = peekMarked.peek(); - int comparisonResult = diff.compareTo(marked); + int comparisonResult = getKey(diff).compareTo(getKey(marked)); if (comparisonResult > 0) { //Extra entries in marked. Ignore them and move on peekMarked.next(); Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/ReferenceCollector.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/ReferenceCollector.java (date 1441792207000) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/ReferenceCollector.java (date 1441797458000) @@ -18,14 +18,21 @@ */ package org.apache.jackrabbit.oak.plugins.blob; +import javax.annotation.Nullable; + /** * Callback interface for collecting all blob references that are * potentially accessible. Useful for marking referenced blobs as * in use when collecting garbage in an external data store. */ public interface ReferenceCollector { - + - void addReference(String reference); - + /** + * Adds the reference detected with the node Id. + * + * @param reference + * @param nodeId + */ + void addReference(String reference, @Nullable String nodeId); } Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/ReferencedBlob.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/ReferencedBlob.java (date 1441797458000) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/ReferencedBlob.java (date 1441797458000) @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.plugins.blob; + +import org.apache.jackrabbit.oak.api.Blob; + +/** + * Exposes the blob along with the Node id from which referenced + */ +public class ReferencedBlob { + private Blob blob; + + private String id; + + public ReferencedBlob(Blob blob, String id) { + this.setBlob(blob); + this.setId(id); + } + + public Blob getBlob() { + return blob; + } + + public void setBlob(Blob blob) { + this.blob = blob; + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + @Override + public String toString() { + return "ReferencedBlob{" + + "blob=" + blob + + ", id='" + id + '\'' + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ReferencedBlob that = (ReferencedBlob) o; + + if (!getBlob().equals(that.getBlob())) { + return false; + } + return !(getId() != null ? !getId().equals(that.getId()) : that.getId() != null); + + } + + @Override + public int hashCode() { + int result = getBlob().hashCode(); + result = 31 * result + (getId() != null ? getId().hashCode() : 0); + return result; + } +} Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/BlobCollector.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/BlobCollector.java (date 1441792207000) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/BlobCollector.java (date 1441797458000) @@ -27,6 +27,7 @@ import org.apache.jackrabbit.oak.api.Type; import org.apache.jackrabbit.oak.commons.json.JsopReader; import org.apache.jackrabbit.oak.commons.json.JsopTokenizer; +import org.apache.jackrabbit.oak.plugins.blob.ReferencedBlob; import org.apache.jackrabbit.oak.plugins.document.util.Utils; public class BlobCollector { @@ -36,7 +37,7 @@ this.nodeStore = nodeStore; } - public void collect(NodeDocument doc, Collection blobs) { + public void collect(NodeDocument doc, Collection blobs) { for (String key : doc.keySet()) { if (!Utils.isPropertyName(key)) { continue; @@ -44,13 +45,13 @@ Map valueMap = doc.getLocalMap(key); for (String v : valueMap.values()) { if (v != null) { - loadValue(v, blobs); + loadValue(v, blobs, doc.getId()); } } } } - private void loadValue(String v, Collection blobs) { + private void loadValue(String v, Collection blobs, String nodeId) { JsopReader reader = new JsopTokenizer(v); PropertyState p; if (reader.matches('[')) { @@ -58,14 +59,14 @@ if (p.getType() == Type.BINARIES) { for (int i = 0; i < p.count(); i++) { Blob b = p.getValue(Type.BINARY, i); - blobs.add(b); + blobs.add(new ReferencedBlob(b, nodeId)); } } } else { p = DocumentPropertyState.readProperty("x", nodeStore, reader); if (p.getType() == Type.BINARY) { Blob b = p.getValue(Type.BINARY); - blobs.add(b); + blobs.add(new ReferencedBlob(b, nodeId)); } } } Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/BlobReferenceIterator.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/BlobReferenceIterator.java (date 1441792207000) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/BlobReferenceIterator.java (date 1441797458000) @@ -21,7 +21,7 @@ import java.util.List; import java.util.NoSuchElementException; -import org.apache.jackrabbit.oak.api.Blob; +import org.apache.jackrabbit.oak.plugins.blob.ReferencedBlob; /** * An iterator over all referenced binaries. @@ -30,13 +30,13 @@ * The items are returned in no particular order. * An item might be returned multiple times. */ -public class BlobReferenceIterator implements Iterator { +public class BlobReferenceIterator implements Iterator { private static final int BATCH_SIZE = 1000; private final DocumentStore docStore; private final BlobCollector blobCollector; - private HashSet batch = new HashSet(); - private Iterator batchIterator; + private HashSet batch = new HashSet(); + private Iterator batchIterator; private boolean done; private String fromKey = NodeDocument.MIN_ID_VALUE; @@ -55,7 +55,7 @@ } @Override - public Blob next() { + public ReferencedBlob next() { // this will load the next batch if required if (!hasNext()) { throw new NoSuchElementException(); Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentBlobReferenceRetriever.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentBlobReferenceRetriever.java (date 1441792207000) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentBlobReferenceRetriever.java (date 1441797458000) @@ -26,6 +26,7 @@ import org.apache.jackrabbit.oak.plugins.blob.BlobReferenceRetriever; import org.apache.jackrabbit.oak.plugins.blob.BlobStoreBlob; import org.apache.jackrabbit.oak.plugins.blob.ReferenceCollector; +import org.apache.jackrabbit.oak.plugins.blob.ReferencedBlob; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,23 +44,24 @@ @Override public void collectReferences(ReferenceCollector collector) { int referencesFound = 0; - Iterator blobIterator = nodeStore.getReferencedBlobsIterator(); + Iterator blobIterator = nodeStore.getReferencedBlobsIterator(); try { while (blobIterator.hasNext()) { - Blob blob = blobIterator.next(); + ReferencedBlob refBlob = blobIterator.next(); + Blob blob = refBlob.getBlob(); referencesFound++; //TODO this mode would also add in memory blobId //Would that be an issue if (blob instanceof BlobStoreBlob) { - collector.addReference(((BlobStoreBlob) blob).getBlobId()); + collector.addReference(((BlobStoreBlob) blob).getBlobId(), refBlob.getId()); } else { //TODO Should not rely on toString. Instead obtain //secure reference and convert that to blobId using //blobStore - collector.addReference(blob.toString()); + collector.addReference(blob.toString(), refBlob.getId()); } } }finally{ Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (date 1441792207000) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (date 1441797458000) @@ -81,6 +81,7 @@ import org.apache.jackrabbit.oak.commons.json.JsopTokenizer; import org.apache.jackrabbit.oak.plugins.blob.BlobStoreBlob; import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector; +import org.apache.jackrabbit.oak.plugins.blob.ReferencedBlob; import org.apache.jackrabbit.oak.plugins.document.Checkpoints.Info; import org.apache.jackrabbit.oak.plugins.document.cache.CacheInvalidationStats; import org.apache.jackrabbit.oak.plugins.document.mongo.MongoBlobReferenceIterator; @@ -2686,7 +2687,7 @@ * @see org.apache.jackrabbit.oak.plugins.document.mongo.MongoBlobReferenceIterator * @return an iterator for all the blobs */ - public Iterator getReferencedBlobsIterator() { + public Iterator getReferencedBlobsIterator() { if(store instanceof MongoDocumentStore){ return new MongoBlobReferenceIterator(this, (MongoDocumentStore) store); } Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobReferenceIterator.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobReferenceIterator.java (date 1441792207000) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobReferenceIterator.java (date 1441797458000) @@ -28,16 +28,16 @@ import com.mongodb.DBCursor; import com.mongodb.DBObject; import com.mongodb.QueryBuilder; -import org.apache.jackrabbit.oak.api.Blob; +import org.apache.jackrabbit.oak.plugins.blob.ReferencedBlob; import org.apache.jackrabbit.oak.plugins.document.BlobCollector; import org.apache.jackrabbit.oak.plugins.document.Collection; import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore; import org.apache.jackrabbit.oak.plugins.document.NodeDocument; -public class MongoBlobReferenceIterator extends AbstractIterator implements Closeable { +public class MongoBlobReferenceIterator extends AbstractIterator implements Closeable { private final MongoDocumentStore documentStore; private final BlobCollector blobCollector; - private final Queue blobs = Queues.newArrayDeque(); + private final Queue blobs = Queues.newArrayDeque(); private DBCursor cursor; @@ -48,7 +48,7 @@ } @Override - protected Blob computeNext() { + protected ReferencedBlob computeNext() { if (blobs.isEmpty()) { loadBatch(); } Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java (date 1441792207000) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java (date 1441797458000) @@ -348,7 +348,7 @@ for (int i = 0; i < blobrefcount; i++) { int offset = (data.getShort(blobrefpos + i * 2) & 0xffff) << 2; SegmentBlob blob = new SegmentBlob(new RecordId(id, offset)); - collector.addReference(blob.getBlobId()); + collector.addReference(blob.getBlobId(), null); } } Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BlobCollectorTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BlobCollectorTest.java (date 1441792207000) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BlobCollectorTest.java (date 1441797458000) @@ -25,6 +25,7 @@ import com.google.common.collect.Lists; import org.apache.jackrabbit.oak.api.Blob; import org.apache.jackrabbit.oak.api.Type; +import org.apache.jackrabbit.oak.plugins.blob.ReferencedBlob; import org.apache.jackrabbit.oak.plugins.document.util.Utils; import org.apache.jackrabbit.oak.plugins.memory.PropertyBuilder; import org.apache.jackrabbit.oak.spi.commit.CommitInfo; @@ -48,7 +49,7 @@ @Test public void testCollect() throws Exception { NodeBuilder b1 = store.getRoot().builder(); - List blobs = Lists.newArrayList(); + List blobs = Lists.newArrayList(); b1.child("x").child("y"); store.merge(b1, EmptyHook.INSTANCE, CommitInfo.EMPTY); @@ -58,7 +59,7 @@ b1 = store.getRoot().builder(); Blob b = store.createBlob(randomStream(i, 4096)); b1.child("x").child("y").setProperty("b" + i, b); - blobs.add(b); + blobs.add(new ReferencedBlob(b, "2:/x/y")); store.merge(b1, EmptyHook.INSTANCE, CommitInfo.EMPTY); } @@ -68,7 +69,7 @@ for(int i = 0; i < 2; i++){ Blob b = store.createBlob(randomStream(i, 4096)); p1.addValue(b); - blobs.add(b); + blobs.add(new ReferencedBlob(b, "2:/x/y")); } b1 = store.getRoot().builder(); b1.child("x").child("y").setProperty(p1.getPropertyState()); @@ -80,16 +81,16 @@ //Change the see to create diff binary Blob b = store.createBlob(randomStream(i+1, 4096)); b1.child("x").child("y").setProperty("b" + i, b); - blobs.add(b); + blobs.add(new ReferencedBlob(b, "2:/x/y")); store.merge(b1, EmptyHook.INSTANCE, CommitInfo.EMPTY); } NodeDocument doc = store.getDocumentStore().find(Collection.NODES, Utils.getIdFromPath("/x/y")); - List collectedBlobs = Lists.newArrayList(); + List collectedBlobs = Lists.newArrayList(); blobCollector.collect(doc, collectedBlobs); assertEquals(blobs.size(), collectedBlobs.size()); - assertEquals(new HashSet(blobs), new HashSet(collectedBlobs)); + assertEquals(new HashSet(blobs), new HashSet(collectedBlobs)); } } Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BlobReferenceIteratorTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BlobReferenceIteratorTest.java (date 1441792207000) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BlobReferenceIteratorTest.java (date 1441797458000) @@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import org.apache.jackrabbit.oak.api.Blob; +import org.apache.jackrabbit.oak.plugins.blob.ReferencedBlob; import org.apache.jackrabbit.oak.spi.commit.CommitInfo; import org.apache.jackrabbit.oak.spi.commit.EmptyHook; import org.apache.jackrabbit.oak.spi.state.NodeBuilder; @@ -76,19 +77,19 @@ @Test public void testBlobIterator() throws Exception{ - List blobs = Lists.newArrayList(); + List blobs = Lists.newArrayList(); //1. Set some single value Binary property for(int i = 0; i < 10; i++){ NodeBuilder b1 = store.getRoot().builder(); Blob b = store.createBlob(randomStream(i, 4096)); b1.child("x").child("y"+1).setProperty("b" + i, b); - blobs.add(b); + blobs.add(new ReferencedBlob(b, "2:/x/y" + 1)); store.merge(b1, EmptyHook.INSTANCE, CommitInfo.EMPTY); } - List collectedBlobs = ImmutableList.copyOf(store.getReferencedBlobsIterator()); + List collectedBlobs = ImmutableList.copyOf(store.getReferencedBlobsIterator()); assertEquals(blobs.size(), collectedBlobs.size()); - assertEquals(new HashSet(blobs), new HashSet(collectedBlobs)); + assertEquals(new HashSet(blobs), new HashSet(collectedBlobs)); } } Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BlobReferenceTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BlobReferenceTest.java (date 1441792207000) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BlobReferenceTest.java (date 1441797458000) @@ -25,6 +25,7 @@ import java.util.Random; import org.apache.jackrabbit.oak.api.Blob; +import org.apache.jackrabbit.oak.plugins.blob.ReferencedBlob; import org.apache.jackrabbit.oak.spi.commit.CommitInfo; import org.apache.jackrabbit.oak.spi.commit.EmptyHook; import org.apache.jackrabbit.oak.spi.state.NodeBuilder; @@ -42,13 +43,13 @@ HashSet set = new HashSet(); for (int i = 0; i < 100; i++) { Blob b = a.createBlob(randomStream(i, 10)); - set.add(b.toString()); + set.add(new ReferencedBlob(b, "1:/c" + i).toString()); a.child("c" + i).setProperty("x", b); } s.merge(a, EmptyHook.INSTANCE, CommitInfo.EMPTY); - Iterator it = s.getReferencedBlobsIterator(); + Iterator it = s.getReferencedBlobsIterator(); while (it.hasNext()) { - Blob b = it.next(); + ReferencedBlob b = it.next(); set.remove(b.toString()); } assertTrue(set.isEmpty()); Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java (date 1441792207000) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java (date 1441797458000) @@ -31,6 +31,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.mongodb.BasicDBObject; @@ -169,8 +170,59 @@ Set existingAfterGC = gc(0); assertTrue(Sets.symmetricDifference(state.blobsPresent, existingAfterGC).isEmpty()); } - + + @Test + public void consistencyCheckInit() throws Exception { + DataStoreState state = setUp(true); + ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10); + MarkSweepGarbageCollector gcObj = init(86400, executor); + long candidates = gcObj.checkConsistency(); + assertEquals(1, executor.getTaskCount()); + assertEquals(0, candidates); + } + + @Test + public void consistencyCheckWithGc() throws Exception { + DataStoreState state = setUp(true); + Set existingAfterGC = gc(0); + assertTrue(Sets.symmetricDifference(state.blobsPresent, existingAfterGC).isEmpty()); + + ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10); + MarkSweepGarbageCollector gcObj = init(86400, executor); + long candidates = gcObj.checkConsistency(); + assertEquals(1, executor.getTaskCount()); + assertEquals(0, candidates); + } + + @Test + public void consistencyCheckWithRenegadeDelete() throws Exception { + DataStoreState state = setUp(true); + + // Simulate faulty state by deleting some blobs directly + Random rand = new Random(87); + List existing = Lists.newArrayList(state.blobsPresent); + + GarbageCollectableBlobStore store = (GarbageCollectableBlobStore) + mk.getNodeStore().getBlobStore(); + long count = store.countDeleteChunks(ImmutableList.of(existing.get(rand.nextInt(existing.size()))), 0); + + ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10); + MarkSweepGarbageCollector gcObj = init(86400, executor); + long candidates = gcObj.checkConsistency(); + assertEquals(1, executor.getTaskCount()); + assertEquals(count, candidates); + } + private Set gc(int blobGcMaxAgeInSecs) throws Exception { + ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10); + MarkSweepGarbageCollector gc = init(blobGcMaxAgeInSecs, executor); + gc.collectGarbage(false); + + assertEquals(0, executor.getTaskCount()); + return iterate(); + } + + private MarkSweepGarbageCollector init(int blobGcMaxAgeInSecs, ThreadPoolExecutor executor) throws Exception { DocumentNodeStore store = mk.getNodeStore(); String repoId = null; if (SharedDataStoreUtils.isShared(store.getBlobStore())) { @@ -179,14 +231,10 @@ new ByteArrayInputStream(new byte[0]), REPOSITORY.getNameFromId(repoId)); } - ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10); MarkSweepGarbageCollector gc = new MarkSweepGarbageCollector( new DocumentBlobReferenceRetriever(store), (GarbageCollectableBlobStore) store.getBlobStore(), executor, "./target", 5, blobGcMaxAgeInSecs, repoId); - gc.collectGarbage(false); - - assertEquals(0, executor.getTaskCount()); - return iterate(); + return gc; } protected Set iterate() throws Exception { Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/ExternalBlobIT.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/ExternalBlobIT.java (date 1441792207000) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/ExternalBlobIT.java (date 1441797458000) @@ -118,7 +118,7 @@ final List refrences = Lists.newArrayList(); store.getTracker().collectBlobReferences(new ReferenceCollector() { @Override - public void addReference(String reference) { + public void addReference(String reference, String nodeId) { assertNotNull(reference); refrences.add(reference); } Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCIT.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCIT.java (date 1441792207000) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCIT.java (date 1441797458000) @@ -226,27 +226,71 @@ assertTrue(Sets.symmetricDifference(state.blobsAdded, existingAfterGC).isEmpty()); } + @Test + public void consistencyCheckInit() throws Exception { + DataStoreState state = setUp(); + ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10); + MarkSweepGarbageCollector gcObj = init(86400, executor); + long candidates = gcObj.checkConsistency(); + assertEquals(1, executor.getTaskCount()); + assertEquals(0, candidates); + } + + @Test + public void consistencyCheckWithGc() throws Exception { + DataStoreState state = setUp(); + Set existingAfterGC = gcInternal(0); + assertTrue(Sets.symmetricDifference(state.blobsPresent, existingAfterGC).isEmpty()); + + ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10); + MarkSweepGarbageCollector gcObj = init(86400, executor); + long candidates = gcObj.checkConsistency(); + assertEquals(1, executor.getTaskCount()); + assertEquals(0, candidates); + } + + @Test + public void consistencyCheckWithRenegadeDelete() throws Exception { + DataStoreState state = setUp(); + + // Simulate faulty state by deleting some blobs directly + Random rand = new Random(87); + List existing = Lists.newArrayList(state.blobsPresent); + + long count = blobStore.countDeleteChunks(ImmutableList.of(existing.get(rand.nextInt(existing.size()))), 0); + + ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10); + MarkSweepGarbageCollector gcObj = init(86400, executor); + long candidates = gcObj.checkConsistency(); + assertEquals(1, executor.getTaskCount()); + assertEquals(count, candidates); + } + private Set gcInternal(long maxBlobGcInSecs) throws Exception { + ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10); + MarkSweepGarbageCollector gc = init(maxBlobGcInSecs, executor); + gc.collectGarbage(false); + + assertEquals(0, executor.getTaskCount()); + Set existingAfterGC = iterate(); + log.info("{} blobs existing after gc : {}", existingAfterGC.size(), existingAfterGC); + return existingAfterGC; + } + + private MarkSweepGarbageCollector init(long blobGcMaxAgeInSecs, ThreadPoolExecutor executor) throws Exception { String repoId = null; if (SharedDataStoreUtils.isShared(store.getBlobStore())) { repoId = ClusterRepositoryInfo.createId(nodeStore); ((SharedDataStore) store.getBlobStore()).addMetadataRecord( - new ByteArrayInputStream(new byte[0]), + new ByteArrayInputStream(new byte[0]), REPOSITORY.getNameFromId(repoId)); } - - ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10); MarkSweepGarbageCollector gc = new MarkSweepGarbageCollector( - new SegmentBlobReferenceRetriever(store.getTracker()), + new SegmentBlobReferenceRetriever(store.getTracker()), - (GarbageCollectableBlobStore) store.getBlobStore(), executor, "./target", 2048, maxBlobGcInSecs, + (GarbageCollectableBlobStore) store.getBlobStore(), executor, "./target", 2048, blobGcMaxAgeInSecs, - repoId); + repoId); - gc.collectGarbage(false); - - assertEquals(0, executor.getTaskCount()); - Set existingAfterGC = iterate(); - log.info("{} blobs existing after gc : {}", existingAfterGC.size(), existingAfterGC); - return existingAfterGC; + return gc; - } + } protected Set iterate() throws Exception { Iterator cur = blobStore.getAllChunkIds(0);