Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/VersionGCSupport.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/VersionGCSupport.java (revision 1786119) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/VersionGCSupport.java (working copy) @@ -20,6 +20,7 @@ package org.apache.jackrabbit.oak.plugins.document; import static com.google.common.collect.Iterables.filter; +import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.getModifiedInSecs; import static org.apache.jackrabbit.oak.plugins.document.util.Utils.getAllDocuments; import static org.apache.jackrabbit.oak.plugins.document.util.Utils.getSelectedDocuments; @@ -39,12 +40,39 @@ this.store = store; } - public Iterable getPossiblyDeletedDocs(final long lastModifiedTime) { + /** + * Returns documents that have a {@link NodeDocument#MODIFIED_IN_SECS} value + * within the given range and the {@link NodeDocument#DELETED} set to + * {@code true}. The two passed modified timestamps are in milliseconds + * since the epoch and the implementation will convert them to seconds at + * the granularity of the {@link NodeDocument#MODIFIED_IN_SECS} field and + * then perform the comparison. + * + * @param fromModified the lower bound modified timestamp (inclusive) + * @param toModified the upper bound modified timestamp (exclusive) + * @return matching documents. + */ + public Iterable getPossiblyDeletedDocs(final long fromModified, + final long toModified) { return filter(getSelectedDocuments(store, NodeDocument.DELETED_ONCE, 1), new Predicate() { @Override public boolean apply(NodeDocument input) { - return input.wasDeletedOnce() && !input.hasBeenModifiedSince(lastModifiedTime); + return input.wasDeletedOnce() + && modifiedGreaterThanEquals(input, fromModified) + && modifiedLessThan(input, toModified); } + + private boolean modifiedGreaterThanEquals(NodeDocument doc, + long time) { + Long modified = doc.getModified(); + return modified != null && modified.compareTo(getModifiedInSecs(time)) >= 0; + } + + private boolean modifiedLessThan(NodeDocument doc, + long time) { + Long modified = doc.getModified(); + return modified != null && modified.compareTo(getModifiedInSecs(time)) < 0; + } }); } Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/VersionGarbageCollector.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/VersionGarbageCollector.java (revision 1786119) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/VersionGarbageCollector.java (working copy) @@ -81,11 +81,23 @@ private static final Set GC_TYPES = EnumSet.of( DEFAULT_LEAF, COMMIT_ROOT_ONLY); + /** + * Document id stored in settings collection that keeps info about version gc + */ + private static final String SETTINGS_COLLECTION_ID = "versionGC"; + + /** + * Property name to timestamp when last gc run happened + */ + private static final String SETTINGS_COLLECTION_OLDEST_TIMESTAMP_PROP = "lastOldestTimeStamp"; + VersionGarbageCollector(DocumentNodeStore nodeStore, VersionGCSupport gcSupport) { this.nodeStore = nodeStore; this.versionStore = gcSupport; this.ds = nodeStore.getDocumentStore(); + + createSettingDocIfNotExist(); } public VersionGCStats gc(long maxRevisionAge, TimeUnit unit) throws IOException { @@ -259,9 +271,11 @@ final long oldestRevTimeStamp = nodeStore.getClock().getTime() - maxRevisionAgeInMillis; final RevisionVector headRevision = nodeStore.getHeadRevision(); - log.info("Starting revision garbage collection. Revisions older than [{}] will be " + - "removed", Utils.timestampToString(oldestRevTimeStamp)); + final long lastOldestTimeStamp = getLastOldestTimeStamp(); + log.info("Starting revision garbage collection. Revisions older than [{}] and newer than [{}] will be removed", + Utils.timestampToString(oldestRevTimeStamp), Utils.timestampToString(lastOldestTimeStamp)); + //Check for any registered checkpoint which prevent the GC from running Revision checkpoint = nodeStore.getCheckpoints().getOldestRevisionToKeep(); if (checkpoint != null && checkpoint.getTimestamp() < oldestRevTimeStamp) { @@ -274,11 +288,16 @@ return phases.stats; } - collectDeletedDocuments(phases, headRevision, oldestRevTimeStamp); + collectDeletedDocuments(phases, headRevision, lastOldestTimeStamp, oldestRevTimeStamp); collectSplitDocuments(phases, oldestRevTimeStamp); phases.close(); phases.stats.canceled = cancel.get(); + + if (!cancel.get()) { + setLastOldestTimeStamp(oldestRevTimeStamp); + } + log.info("Revision garbage collection finished in {}. {}", phases.elapsed, phases.stats); return phases.stats; } @@ -292,13 +311,14 @@ private void collectDeletedDocuments(GCPhases phases, RevisionVector headRevision, - long oldestRevTimeStamp) + final long lastOldestTimeStamp, + final long oldestRevTimeStamp) throws IOException { int docsTraversed = 0; DeletedDocsGC gc = new DeletedDocsGC(headRevision, cancel); try { if (phases.start(GCPhase.COLLECTING)) { - Iterable itr = versionStore.getPossiblyDeletedDocs(oldestRevTimeStamp); + Iterable itr = versionStore.getPossiblyDeletedDocs(lastOldestTimeStamp, oldestRevTimeStamp); try { for (NodeDocument doc : itr) { // continue with GC? @@ -364,6 +384,25 @@ } } + private long getLastOldestTimeStamp() { + Document versionGCDoc = ds.find(Collection.SETTINGS, SETTINGS_COLLECTION_ID, 0); + return (Long) versionGCDoc.get(SETTINGS_COLLECTION_OLDEST_TIMESTAMP_PROP); + } + + private void setLastOldestTimeStamp(long lastGCRunTime) { + UpdateOp updateOp = new UpdateOp(SETTINGS_COLLECTION_ID, false); + updateOp.set(SETTINGS_COLLECTION_OLDEST_TIMESTAMP_PROP, lastGCRunTime); + ds.createOrUpdate(Collection.SETTINGS, updateOp); + } + + private void createSettingDocIfNotExist() { + if (ds.find(Collection.SETTINGS, SETTINGS_COLLECTION_ID) == null) { + UpdateOp updateOp = new UpdateOp(SETTINGS_COLLECTION_ID, true); + updateOp.set(SETTINGS_COLLECTION_OLDEST_TIMESTAMP_PROP, 0); + ds.create(Collection.SETTINGS, Lists.newArrayList(updateOp)); + } + } + /** * A helper class to remove document for deleted nodes. */ Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoVersionGCSupport.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoVersionGCSupport.java (revision 1786119) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoVersionGCSupport.java (working copy) @@ -57,11 +57,6 @@ public class MongoVersionGCSupport extends VersionGCSupport { private static final Logger LOG = LoggerFactory.getLogger(MongoVersionGCSupport.class); private final MongoDocumentStore store; - /** - * Disables the index hint sent to MongoDB. - */ - private final boolean disableIndexHint = - Boolean.getBoolean("oak.mongo.disableVersionGCIndexHint"); /** * The batch size for the query of possibly deleted docs. @@ -75,17 +70,13 @@ } @Override - public CloseableIterable getPossiblyDeletedDocs(final long lastModifiedTime) { - //_deletedOnce == true && _modified < lastModifiedTime - DBObject query = - start(NodeDocument.DELETED_ONCE).is(Boolean.TRUE) - .put(NodeDocument.MODIFIED_IN_SECS).lessThan(NodeDocument.getModifiedInSecs(lastModifiedTime)) - .get(); + public CloseableIterable getPossiblyDeletedDocs(final long fromModified, final long toModified) { + //_deletedOnce == true && _modified >= fromModified && _modified < toModified + DBObject query = start(NodeDocument.DELETED_ONCE).is(Boolean.TRUE).put(NodeDocument.MODIFIED_IN_SECS) + .greaterThanEquals(NodeDocument.getModifiedInSecs(fromModified)) + .lessThan(NodeDocument.getModifiedInSecs(toModified)).get(); DBCursor cursor = getNodeCollection().find(query).setReadPreference(ReadPreference.secondaryPreferred()); cursor.batchSize(batchSize); - if (!disableIndexHint) { - cursor.hint(new BasicDBObject(NodeDocument.DELETED_ONCE, 1)); - } return CloseableIterable.wrap(transform(cursor, new Function() { @Override Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBVersionGCSupport.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBVersionGCSupport.java (revision 1786119) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBVersionGCSupport.java (working copy) @@ -47,10 +47,11 @@ } @Override - public Iterable getPossiblyDeletedDocs(final long lastModifiedTime) { + public Iterable getPossiblyDeletedDocs(final long fromModified, final long toModified) { List conditions = new ArrayList(); conditions.add(new QueryCondition(NodeDocument.DELETED_ONCE, "=", 1)); - conditions.add(new QueryCondition(NodeDocument.MODIFIED_IN_SECS, "<", NodeDocument.getModifiedInSecs(lastModifiedTime))); + conditions.add(new QueryCondition(NodeDocument.MODIFIED_IN_SECS, "<", NodeDocument.getModifiedInSecs(toModified))); + conditions.add(new QueryCondition(NodeDocument.MODIFIED_IN_SECS, ">=", NodeDocument.getModifiedInSecs(fromModified))); return getIterator(RDBDocumentStore.EMPTY_KEY_PATTERN, conditions); } Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGCSupportTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGCSupportTest.java (nonexistent) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGCSupportTest.java (working copy) @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.plugins.document; + +import java.util.List; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; + +import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore; +import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStore; +import org.apache.jackrabbit.oak.plugins.document.mongo.MongoVersionGCSupport; +import org.apache.jackrabbit.oak.plugins.document.rdb.RDBDocumentStore; +import org.apache.jackrabbit.oak.plugins.document.rdb.RDBVersionGCSupport; +import org.apache.jackrabbit.oak.plugins.document.util.Utils; +import org.junit.After; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.jackrabbit.oak.plugins.document.DocumentStoreFixture.MEMORY; +import static org.apache.jackrabbit.oak.plugins.document.DocumentStoreFixture.MONGO; +import static org.apache.jackrabbit.oak.plugins.document.DocumentStoreFixture.RDB_H2; +import static org.junit.Assert.assertEquals; + +@RunWith(Parameterized.class) +public class VersionGCSupportTest { + + private DocumentStoreFixture fixture; + private DocumentStore store; + private VersionGCSupport gcSupport; + private List ids = Lists.newArrayList(); + + @Parameterized.Parameters(name="{0}") + public static java.util.Collection fixtures() { + List fixtures = Lists.newArrayList(); + if (RDB_H2.isAvailable()) { + RDBDocumentStore store = (RDBDocumentStore) RDB_H2.createDocumentStore(); + fixtures.add(new Object[]{RDB_H2, store, new RDBVersionGCSupport(store)}); + } + if (MONGO.isAvailable()) { + MongoDocumentStore store = (MongoDocumentStore) MONGO.createDocumentStore(); + fixtures.add(new Object[]{MONGO, store, new MongoVersionGCSupport(store)}); + } + if (MEMORY.isAvailable()) { + DocumentStore store = new MemoryDocumentStore(); + fixtures.add(new Object[]{MEMORY, store, new VersionGCSupport(store)}); + } + return fixtures; + } + + public VersionGCSupportTest(DocumentStoreFixture fixture, + DocumentStore store, + VersionGCSupport gcSupport) { + this.fixture = fixture; + this.store = store; + this.gcSupport = gcSupport; + } + + @After + public void after() throws Exception { + store.remove(Collection.NODES, ids); + fixture.dispose(); + } + + @Test + public void getPossiblyDeletedDocs() { + long offset = SECONDS.toMillis(42); + for (int i = 0; i < 5; i++) { + Revision r = new Revision(offset + SECONDS.toMillis(i), 0, 1); + String id = Utils.getIdFromPath("/doc-" + i); + ids.add(id); + UpdateOp op = new UpdateOp(id, true); + NodeDocument.setModified(op, r); + NodeDocument.setDeleted(op, r, true); + store.create(Collection.NODES, Lists.newArrayList(op)); + } + + assertPossiblyDeleted(0, 41, 0); + assertPossiblyDeleted(0, 42, 0); + assertPossiblyDeleted(0, 44, 0); + assertPossiblyDeleted(0, 45, 3); + assertPossiblyDeleted(0, 46, 3); + assertPossiblyDeleted(0, 49, 3); + assertPossiblyDeleted(0, 50, 5); + assertPossiblyDeleted(0, 51, 5); + assertPossiblyDeleted(39, 60, 5); + assertPossiblyDeleted(40, 60, 5); + assertPossiblyDeleted(41, 60, 5); + assertPossiblyDeleted(42, 60, 5); + assertPossiblyDeleted(44, 60, 5); + assertPossiblyDeleted(45, 60, 2); + assertPossiblyDeleted(47, 60, 2); + assertPossiblyDeleted(48, 60, 2); + assertPossiblyDeleted(49, 60, 2); + assertPossiblyDeleted(50, 60, 0); + assertPossiblyDeleted(51, 60, 0); + } + + private void assertPossiblyDeleted(long fromSeconds, long toSeconds, long num) { + Iterable docs = gcSupport.getPossiblyDeletedDocs(SECONDS.toMillis(fromSeconds), SECONDS.toMillis(toSeconds)); + assertEquals(num, Iterables.size(docs)); + } +} Property changes on: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGCSupportTest.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGCTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGCTest.java (revision 1786119) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGCTest.java (working copy) @@ -43,6 +43,9 @@ import static java.util.concurrent.TimeUnit.HOURS; import static java.util.concurrent.TimeUnit.MINUTES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -134,6 +137,42 @@ assertTrue(stats.get().canceled); } + @Test + public void cancelMustNotUpdateLastOldestTimeStamp() throws Exception { + // get previous entry from SETTINGS + String versionGCId = "versionGC"; + String lastOldestTimeStampProp = "lastOldestTimeStamp"; + Document statusBefore = store.find(Collection.SETTINGS, versionGCId); + // block gc call + store.semaphore.acquireUninterruptibly(); + Future stats = gc(); + boolean gcBlocked = false; + for (int i = 0; i < 10; i ++) { + if (store.semaphore.hasQueuedThreads()) { + gcBlocked = true; + break; + } + Thread.sleep(100); + } + assertTrue(gcBlocked); + // now cancel the GC + gc.cancel(); + store.semaphore.release(); + assertTrue(stats.get().canceled); + + // ensure a canceled GC doesn't update that versionGC SETTINGS entry + Document statusAfter = store.find(Collection.SETTINGS, "versionGC"); + if (statusBefore == null) { + assertNull(statusAfter); + } else { + assertNotNull(statusAfter); + assertEquals( + "canceled GC shouldn't change the " + lastOldestTimeStampProp + " property on " + versionGCId + + " settings entry", + statusBefore.get(lastOldestTimeStampProp), statusAfter.get(lastOldestTimeStampProp)); + } + } + private Future gc() { // run gc in a separate thread return execService.submit(new Callable() { Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGarbageCollectorIT.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGarbageCollectorIT.java (revision 1786119) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGarbageCollectorIT.java (working copy) @@ -30,6 +30,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import static com.google.common.collect.Iterables.filter; @@ -86,6 +87,8 @@ private Clock clock; + private DocumentMK.Builder documentMKBuilder; + private DocumentNodeStore store; private VersionGarbageCollector gc; @@ -121,12 +124,9 @@ clock = new Clock.Virtual(); clock.waitUntil(System.currentTimeMillis()); Revision.setClock(clock); - store = new DocumentMK.Builder() - .clock(clock) - .setLeaseCheck(false) - .setDocumentStore(fixture.createDocumentStore()) - .setAsyncDelay(0) - .getNodeStore(); + documentMKBuilder = new DocumentMK.Builder().clock(clock).setLeaseCheck(false) + .setDocumentStore(fixture.createDocumentStore()).setAsyncDelay(0); + store = documentMKBuilder.getNodeStore(); gc = store.getVersionGarbageCollector(); } @@ -453,8 +453,8 @@ final BlockingQueue docs = Queues.newSynchronousQueue(); VersionGCSupport gcSupport = new VersionGCSupport(store.getDocumentStore()) { @Override - public Iterable getPossiblyDeletedDocs(long lastModifiedTime) { - return filter(super.getPossiblyDeletedDocs(lastModifiedTime), + public Iterable getPossiblyDeletedDocs(long fromModified, long toModified) { + return filter(super.getPossiblyDeletedDocs(fromModified, toModified), new Predicate() { @Override public boolean apply(NodeDocument input) { @@ -622,10 +622,10 @@ final AtomicReference gcRef = Atomics.newReference(); VersionGCSupport gcSupport = new VersionGCSupport(store.getDocumentStore()) { @Override - public Iterable getPossiblyDeletedDocs(long lastModifiedTime) { + public Iterable getPossiblyDeletedDocs(long fromModified, long toModified) { // cancel as soon as it runs gcRef.get().cancel(); - return super.getPossiblyDeletedDocs(lastModifiedTime); + return super.getPossiblyDeletedDocs(fromModified, toModified); } }; gcRef.set(new VersionGarbageCollector(store, gcSupport)); @@ -655,12 +655,12 @@ final AtomicReference gcRef = Atomics.newReference(); VersionGCSupport gcSupport = new VersionGCSupport(store.getDocumentStore()) { @Override - public Iterable getPossiblyDeletedDocs(final long lastModifiedTime) { + public Iterable getPossiblyDeletedDocs(final long fromModified, final long toModified) { return new Iterable() { @Override public Iterator iterator() { return new AbstractIterator() { - private Iterator it = candidates(lastModifiedTime); + private Iterator it = candidates(fromModified, toModified); @Override protected NodeDocument computeNext() { if (it.hasNext()) { @@ -675,8 +675,8 @@ }; } - private Iterator candidates(long lastModifiedTime) { - return super.getPossiblyDeletedDocs(lastModifiedTime).iterator(); + private Iterator candidates(long prevLastModifiedTime, long lastModifiedTime) { + return super.getPossiblyDeletedDocs(prevLastModifiedTime, lastModifiedTime).iterator(); } }; gcRef.set(new VersionGarbageCollector(store, gcSupport)); @@ -688,6 +688,59 @@ assertEquals(0, stats.splitDocGCCount); } + // OAK-3070 + @Test + public void lowerBoundOfModifiedDocs() throws Exception { + Revision.setClock(clock); + final VersionGCSupport fixtureGCSupport = documentMKBuilder.createVersionGCSupport(); + final AtomicInteger docCounter = new AtomicInteger(); + VersionGCSupport nonReportingGcSupport = new VersionGCSupport(store.getDocumentStore()) { + @Override + public Iterable getPossiblyDeletedDocs(final long fromModified, long toModified) { + return filter(fixtureGCSupport.getPossiblyDeletedDocs(fromModified, toModified), + new Predicate() { + @Override + public boolean apply(NodeDocument input) { + docCounter.incrementAndGet(); + return false;// don't report any doc to be + // GC'able + } + }); + } + }; + final VersionGarbageCollector gc = new VersionGarbageCollector(store, nonReportingGcSupport); + final long maxAgeHours = 1; + final long clockDelta = HOURS.toMillis(maxAgeHours) + MINUTES.toMillis(5); + + // create and delete node + NodeBuilder builder = store.getRoot().builder(); + builder.child("foo1"); + merge(store, builder); + builder = store.getRoot().builder(); + builder.getChildNode("foo1").remove(); + merge(store, builder); + store.runBackgroundOperations(); + + clock.waitUntil(clock.getTime() + clockDelta); + gc.gc(maxAgeHours, HOURS); + assertEquals("Not all deletable docs got reported on first run", 1, docCounter.get()); + + docCounter.set(0); + // create and delete another node + builder = store.getRoot().builder(); + builder.child("foo2"); + merge(store, builder); + builder = store.getRoot().builder(); + builder.getChildNode("foo2").remove(); + merge(store, builder); + store.runBackgroundOperations(); + + // wait another hour and GC in last 1 hour + clock.waitUntil(clock.getTime() + clockDelta); + gc.gc(maxAgeHours, HOURS); + assertEquals(1, docCounter.get()); + } + private void createTestNode(String name) throws CommitFailedException { DocumentStore ds = store.getDocumentStore(); NodeBuilder builder = store.getRoot().builder();