Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/VersionGCOptions.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/VersionGCOptions.java (nonexistent)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/VersionGCOptions.java (working copy)
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.util.concurrent.TimeUnit;
+
+public class VersionGCOptions {
+
+ public final int overflowToDiskThreshold;
+ public final long collectLimit;
+ public final long precisionMs;
+ public final int maxIterations;
+ public final long maxDurationMs;
+ public final double delayFactor;
+
+ public VersionGCOptions() {
+ this(100000, 100000, TimeUnit.MINUTES.toMillis(1),
+ 0, TimeUnit.HOURS.toMillis(0), 0);
+ }
+
+ private VersionGCOptions(int overflow, long collectLimit, long precisionMs,
+ int maxIterations, long maxDurationMs, double delayFactor) {
+ this.overflowToDiskThreshold = overflow;
+ this.collectLimit = collectLimit;
+ this.precisionMs = precisionMs;
+ this.maxIterations = maxIterations;
+ this.maxDurationMs = maxDurationMs;
+ this.delayFactor = delayFactor;
+ }
+
+ /**
+ * Set the limit of number of resource id+_modified strings (not length) held in memory during
+ * a collection run. Any more will be stored and sorted in a temporary file.
+ * @param overflowToDiskThreshold limit after which to use file based storage for candidate ids
+ */
+ public VersionGCOptions withOverflowToDiskThreshold(int overflowToDiskThreshold) {
+ return new VersionGCOptions(overflowToDiskThreshold, this.collectLimit,
+ this.precisionMs, this.maxIterations, this.maxDurationMs, this.delayFactor);
+ }
+
+ /**
+ * Sets the absolute limit on number of resource ids collected in one run. This does not count
+ * nodes which can be deleted immediately. When this limit is exceeded, the run either fails or
+ * is attempted with different parameters, depending on other settings. Note that if the inspected
+ * time interval is equal or less than {@link #precisionMs}, the collection limit will be ignored.
+ *
+ * @param limit the absolute limit of resources collected in one run
+ */
+ public VersionGCOptions withCollectLimit(long limit) {
+ return new VersionGCOptions(this.overflowToDiskThreshold, limit,
+ this.precisionMs, this.maxIterations, this.maxDurationMs, this.delayFactor);
+ }
+
+ /**
+ * Set the minimum duration that is used for time based searches. This should at minimum be the
+ * precision available on modification dates of documents, but can be set larger to avoid querying
+ * the database too often. Note however that {@link #collectLimit} will not take effect for runs
+ * that query equal or shorter than precision duration.
+ *
+ * @param unit time unit used for duration
+ * @param t the number of units in the duration
+ */
+ public VersionGCOptions withPrecisionMs(TimeUnit unit, long t) {
+ return new VersionGCOptions(this.overflowToDiskThreshold, this.collectLimit,
+ unit.toMillis(t), this.maxIterations, this.maxDurationMs, this.delayFactor);
+ }
+
+ /**
+ * Set the maximum duration in elapsed time that the garbage collection shall take. Setting this
+ * to 0 means that there is no limit imposed. A positive duration will impose a soft limit, e.g.
+ * the collection might take longer, but no next iteration will be attempted afterwards. See
+ * {@link #withMaxIterations(int)} on how to control the behaviour.
+ *
+ * @param unit time unit used for duration
+ * @param t the number of units in the duration
+ */
+ public VersionGCOptions withMaxDuration(TimeUnit unit, long t) {
+ return new VersionGCOptions(this.overflowToDiskThreshold, this.collectLimit,
+ this.precisionMs, this.maxIterations, unit.toMillis(t), this.delayFactor);
+ }
+
+ /**
+ * Set the maximum number of iterations that shall be attempted in a single run. A value
+ * of 0 means that there is no limit. Since the garbage collector uses iterations to find
+ * suitable time intervals and set sizes for cleanups, limiting the iterations is only
+ * recommended for setups where the collector is called often.
+ *
+ * @param max the maximum number of iterations allowed
+ */
+ public VersionGCOptions withMaxIterations(int max) {
+ return new VersionGCOptions(this.overflowToDiskThreshold, this.collectLimit,
+ this.precisionMs, this.maxIterations, max, this.delayFactor);
+ }
+
+ /**
+ * Set a delay factor between batched database modifications. This rate limits the writes
+ * to the database by a garbage collector. 0, e.g. no delay, is the default. This is recommended
+ * when garbage collection is done during a maintenance time when other system load is low.
+ *
+ * For factory > 0, the actual delay is the duration of the last batch modification times
+ * the factor. Example: 0.25 would result in a 25% delay, e.g. a batch modification running
+ * 10 seconds would be followed by a sleep of 2.5 seconds.
+ *
+ * @param f the factor used to calculate batch modification delays
+ */
+ public VersionGCOptions withDelayFactor(double f) {
+ return new VersionGCOptions(this.overflowToDiskThreshold, this.collectLimit,
+ this.precisionMs, this.maxIterations, this.maxDurationMs, f);
+ }
+
+}
Property changes on: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/VersionGCOptions.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/VersionGCSupport.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/VersionGCSupport.java (revision 1789523)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/VersionGCSupport.java (working copy)
@@ -29,11 +29,16 @@
import org.apache.jackrabbit.oak.plugins.document.NodeDocument.SplitDocType;
import org.apache.jackrabbit.oak.plugins.document.VersionGarbageCollector.VersionGCStats;
import org.apache.jackrabbit.oak.plugins.document.util.Utils;
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.base.Predicate;
public class VersionGCSupport {
+ private static final Logger LOG = LoggerFactory.getLogger(VersionGCSupport.class);
+
private final DocumentStore store;
public VersionGCSupport(DocumentStore store) {
@@ -105,4 +110,41 @@
}
});
}
+
+ /**
+ * Retrieve the time of the oldest document marked as 'deletedOnce'.
+ *
+ * @param precisionMs the exact time may vary by given precision
+ * @return the timestamp of the oldest document marked with 'deletecOnce',
+ * module given prevision. If no such document exists, returns the
+ * max time inspected (close to current time).
+ */
+ public long getOldestDeletedOnceTimestamp(Clock clock, long precisionMs) {
+ long ts = 0;
+ long now = clock.getTime();
+ long duration = (now - ts) / 2;
+ Iterable docs;
+
+ while (duration > precisionMs) {
+ // check for delete candidates in [ ts, ts + duration]
+ LOG.debug("find oldest _deletedOnce, check < {}", Utils.timestampToString(ts + duration));
+ docs = getPossiblyDeletedDocs(ts, ts + duration);
+ if (docs.iterator().hasNext()) {
+ // look if there are still nodes to be found in the lower half
+ duration /= 2;
+ }
+ else {
+ // so, there are no delete candidates older than "ts + duration"
+ ts = ts + duration;
+ duration /= 2;
+ }
+ Utils.closeIfCloseable(docs);
+ }
+ LOG.debug("find oldest _deletedOnce to be {}", Utils.timestampToString(ts));
+ return ts;
+ }
+
+ public long getDeletedOnceCount() throws UnsupportedOperationException {
+ throw new UnsupportedOperationException("getDeletedOnceCount()");
+ }
}
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/VersionGarbageCollector.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/VersionGarbageCollector.java (revision 1789523)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/VersionGarbageCollector.java (working copy)
@@ -46,9 +46,11 @@
import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Condition;
import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Key;
import org.apache.jackrabbit.oak.plugins.document.util.Utils;
+import org.apache.jackrabbit.oak.stats.Clock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.StandardSystemProperty.LINE_SEPARATOR;
import static com.google.common.collect.Iterables.all;
@@ -56,23 +58,21 @@
import static com.google.common.util.concurrent.Atomics.newReference;
import static java.util.Collections.singletonMap;
import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES;
+import static org.apache.jackrabbit.oak.plugins.document.Collection.SETTINGS;
import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.MODIFIED_IN_SECS;
import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.SplitDocType.COMMIT_ROOT_ONLY;
import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.SplitDocType.DEFAULT_LEAF;
import static org.apache.jackrabbit.oak.plugins.document.UpdateOp.Condition.newEqualsCondition;
+/* TODO: Can you please add tests for TimeInterval? */
+
public class VersionGarbageCollector {
+
//Kept less than MongoDocumentStore.IN_CLAUSE_BATCH_SIZE to avoid re-partitioning
private static final int DELETE_BATCH_SIZE = 450;
private static final int UPDATE_BATCH_SIZE = 450;
private static final int PROGRESS_BATCH_SIZE = 10000;
private static final Key KEY_MODIFIED = new Key(MODIFIED_IN_SECS, null);
- private final DocumentNodeStore nodeStore;
- private final DocumentStore ds;
- private final VersionGCSupport versionStore;
- private int overflowToDiskThreshold = 100000;
- private final AtomicReference collector = newReference();
-
private static final Logger log = LoggerFactory.getLogger(VersionGarbageCollector.class);
/**
@@ -91,22 +91,55 @@
*/
private static final String SETTINGS_COLLECTION_OLDEST_TIMESTAMP_PROP = "lastOldestTimeStamp";
+ /**
+ * Property name to recommended time interval for next collection run
+ */
+ private static final String SETTINGS_COLLECTION_REC_INTERVAL_PROP = "recommendedIntervalMs";
+
+ private final DocumentNodeStore nodeStore;
+ private final DocumentStore ds;
+ private final VersionGCSupport versionStore;
+ private final AtomicReference collector = newReference();
+ private VersionGCOptions options;
+
VersionGarbageCollector(DocumentNodeStore nodeStore,
VersionGCSupport gcSupport) {
this.nodeStore = nodeStore;
this.versionStore = gcSupport;
this.ds = nodeStore.getDocumentStore();
-
- createSettingDocIfNotExist();
+ this.options = new VersionGCOptions();
}
public VersionGCStats gc(long maxRevisionAge, TimeUnit unit) throws IOException {
long maxRevisionAgeInMillis = unit.toMillis(maxRevisionAge);
- GCJob job = new GCJob(maxRevisionAgeInMillis);
+ TimeInterval maxRunTime = new TimeInterval(nodeStore.getClock().getTime(), Long.MAX_VALUE);
+ if (options.maxDurationMs > 0) {
+ maxRunTime = maxRunTime.startAndDuration(options.maxDurationMs);
+ }
+ GCJob job = new GCJob(maxRevisionAgeInMillis, options);
if (collector.compareAndSet(null, job)) {
+ VersionGCStats stats, overall = new VersionGCStats();
+ overall.active.start();
try {
- return job.run();
+ long averageDurationMs = 0;
+ while (maxRunTime.contains(nodeStore.getClock().getTime() + averageDurationMs)) {
+ log.info("start {}. run (avg duration {} sec)",
+ overall.iterationCount + 1, averageDurationMs / 1000.0);
+ stats = job.run();
+
+ overall.addRun(stats);
+ if (options.maxIterations > 0 && overall.iterationCount >= options.maxIterations) {
+ break;
+ }
+ if (!overall.needRepeat) {
+ break;
+ }
+ averageDurationMs = ((averageDurationMs * (overall.iterationCount - 1))
+ + stats.active.elapsed(TimeUnit.MILLISECONDS)) / overall.iterationCount;
+ }
+ return overall;
} finally {
+ overall.active.stop();
collector.set(null);
}
} else {
@@ -121,18 +154,67 @@
}
}
- public void setOverflowToDiskThreshold(int overflowToDiskThreshold) {
- this.overflowToDiskThreshold = overflowToDiskThreshold;
+ public VersionGCOptions getOptions() {
+ return this.options;
}
+ public void setOptions(VersionGCOptions options) {
+ this.options = options;
+ }
+
+ public void reset() {
+ ds.remove(SETTINGS, SETTINGS_COLLECTION_ID);
+ }
+
+ public VersionGCInfo getInfo(long maxRevisionAge, TimeUnit unit)
+ throws IOException {
+ long maxRevisionAgeInMillis = unit.toMillis(maxRevisionAge);
+ long now = nodeStore.getClock().getTime();
+ Recommendations rec = new Recommendations(maxRevisionAgeInMillis, options);
+ return new VersionGCInfo(rec.lastOldestTimestamp, rec.scope.fromMs,
+ rec.deleteCandidateCount, rec.maxCollect,
+ rec.suggestedIntervalMs, rec.scope.toMs,
+ (int)Math.ceil((now - rec.scope.toMs) / rec.suggestedIntervalMs));
+ }
+
+ public static class VersionGCInfo {
+ public final long lastSuccess;
+ public final long oldestRevisionEstimate;
+ public final long revisionsCandidateCount;
+ public final long collectLimit;
+ public final long recommendedCleanupInterval;
+ public final long recommendedCleanupTimestamp;
+ public final int estimatedIterations;
+
+ VersionGCInfo(long lastSuccess,
+ long oldestRevisionEstimate,
+ long revisionsCandidateCount,
+ long collectLimit,
+ long recommendedCleanupInterval,
+ long recommendedCleanupTimestamp,
+ int estimatedIterations) {
+ this.lastSuccess = lastSuccess;
+ this.oldestRevisionEstimate = oldestRevisionEstimate;
+ this.revisionsCandidateCount = revisionsCandidateCount;
+ this.collectLimit = collectLimit;
+ this.recommendedCleanupInterval = recommendedCleanupInterval;
+ this.recommendedCleanupTimestamp = recommendedCleanupTimestamp;
+ this.estimatedIterations = estimatedIterations;
+ }
+ }
+
public static class VersionGCStats {
boolean ignoredGCDueToCheckPoint;
boolean canceled;
+ boolean limitExceeded;
+ boolean needRepeat;
+ int iterationCount;
int deletedDocGCCount;
int deletedLeafDocGCCount;
int splitDocGCCount;
int intermediateSplitDocGCCount;
int updateResurrectedGCCount;
+ final Stopwatch active = Stopwatch.createUnstarted();
final Stopwatch collectDeletedDocs = Stopwatch.createUnstarted();
final Stopwatch checkDeletedDocs = Stopwatch.createUnstarted();
final Stopwatch deleteDeletedDocs = Stopwatch.createUnstarted();
@@ -149,6 +231,8 @@
", updateResurrectedGCCount=" + updateResurrectedGCCount +
", splitDocGCCount=" + splitDocGCCount +
", intermediateSplitDocGCCount=" + intermediateSplitDocGCCount +
+ ", iterationCount=" + iterationCount +
+ ", timeActive=" + active +
", timeToCollectDeletedDocs=" + collectDeletedDocs +
", timeToCheckDeletedDocs=" + checkDeletedDocs +
", timeToSortDocIds=" + sortDocIds +
@@ -157,6 +241,20 @@
", timeTakenToCollectAndDeleteSplitDocs=" + collectAndDeleteSplitDocs +
'}';
}
+
+ void addRun(VersionGCStats run) {
+ ++iterationCount;
+ this.ignoredGCDueToCheckPoint = run.ignoredGCDueToCheckPoint;
+ this.canceled = run.canceled;
+ this.limitExceeded = run.limitExceeded;
+ this.needRepeat = run.needRepeat;
+ this.deletedDocGCCount += run.deletedDocGCCount;
+ this.deletedLeafDocGCCount += run.deletedLeafDocGCCount;
+ this.splitDocGCCount += run.splitDocGCCount;
+ this.intermediateSplitDocGCCount += run.intermediateSplitDocGCCount;
+ this.updateResurrectedGCCount += run.updateResurrectedGCCount;
+ /* TODO: add stopwatch values */
+ }
}
private enum GCPhase {
@@ -171,7 +269,7 @@
/**
* Keeps track of timers when switching GC phases.
- *
+ *
* Could be merged with VersionGCStats, however this way the public class is kept unchanged.
*/
private static class GCPhases {
@@ -182,8 +280,8 @@
private final Map watches = Maps.newHashMap();
private final AtomicBoolean canceled;
- GCPhases(AtomicBoolean canceled) {
- this.stats = new VersionGCStats();
+ GCPhases(AtomicBoolean canceled, VersionGCStats stats) {
+ this.stats = stats;
this.elapsed = Stopwatch.createStarted();
this.watches.put(GCPhase.NONE, Stopwatch.createStarted());
this.watches.put(GCPhase.COLLECTING, stats.collectDeletedDocs);
@@ -230,7 +328,7 @@
}
private GCPhase current() {
- return phases.isEmpty()? GCPhase.NONE : phases.get(phases.size() - 1);
+ return phases.isEmpty() ? GCPhase.NONE : phases.get(phases.size() - 1);
}
private Stopwatch currentWatch() {
@@ -242,6 +340,7 @@
w.start();
}
}
+
private void suspend(Stopwatch w) {
if (w.isRunning()) {
w.stop();
@@ -252,10 +351,12 @@
private class GCJob {
private final long maxRevisionAgeMillis;
+ private final VersionGCOptions options;
private AtomicBoolean cancel = new AtomicBoolean();
- GCJob(long maxRevisionAgeMillis) {
+ GCJob(long maxRevisionAgeMillis, VersionGCOptions options) {
this.maxRevisionAgeMillis = maxRevisionAgeMillis;
+ this.options = options;
}
VersionGCStats run() throws IOException {
@@ -268,44 +369,37 @@
}
private VersionGCStats gc(long maxRevisionAgeInMillis) throws IOException {
- GCPhases phases = new GCPhases(cancel);
- final long oldestRevTimeStamp = nodeStore.getClock().getTime() - maxRevisionAgeInMillis;
- final RevisionVector headRevision = nodeStore.getHeadRevision();
+ VersionGCStats stats = new VersionGCStats();
+ stats.active.start();
+ Recommendations rec = new Recommendations(maxRevisionAgeInMillis, options);
+ GCPhases phases = new GCPhases(cancel, stats);
+ try {
+ if (rec.ignoreDueToCheckPoint) {
+ phases.stats.ignoredGCDueToCheckPoint = true;
+ cancel.set(true);
+ } else {
+ final RevisionVector headRevision = nodeStore.getHeadRevision();
+ log.info("Looking at revisions in {}", rec.scope);
- final long lastOldestTimeStamp = getLastOldestTimeStamp();
-
- log.info("Starting revision garbage collection. Revisions older than [{}] and newer than [{}] will be removed",
- Utils.timestampToString(oldestRevTimeStamp), Utils.timestampToString(lastOldestTimeStamp));
-
- //Check for any registered checkpoint which prevent the GC from running
- Revision checkpoint = nodeStore.getCheckpoints().getOldestRevisionToKeep();
- if (checkpoint != null && checkpoint.getTimestamp() < oldestRevTimeStamp) {
- log.warn("Ignoring revision garbage collection because a valid " +
- "checkpoint [{}] was found, which is older than [{}].",
- checkpoint.toReadableString(),
- Utils.timestampToString(oldestRevTimeStamp)
- );
- phases.stats.ignoredGCDueToCheckPoint = true;
- return phases.stats;
+ collectDeletedDocuments(phases, headRevision, rec);
+ collectSplitDocuments(phases, rec);
+ }
+ } catch (LimitExceededException ex) {
+ stats.limitExceeded = true;
+ } finally {
+ phases.close();
+ stats.canceled = cancel.get();
}
- collectDeletedDocuments(phases, headRevision, lastOldestTimeStamp, oldestRevTimeStamp);
- collectSplitDocuments(phases, oldestRevTimeStamp);
-
- phases.close();
- phases.stats.canceled = cancel.get();
-
- if (!cancel.get()) {
- setLastOldestTimeStamp(oldestRevTimeStamp);
- }
-
- log.info("Revision garbage collection finished in {}. {}", phases.elapsed, phases.stats);
- return phases.stats;
+ rec.evaluate(stats);
+ log.info("Revision garbage collection finished in {}. {}", phases.elapsed, stats);
+ stats.active.stop();
+ return stats;
}
- private void collectSplitDocuments(GCPhases phases, long oldestRevTimeStamp) {
+ private void collectSplitDocuments(GCPhases phases, Recommendations rec) {
if (phases.start(GCPhase.SPLITS_CLEANUP)) {
- versionStore.deleteSplitDocuments(GC_TYPES, oldestRevTimeStamp, phases.stats);
+ versionStore.deleteSplitDocuments(GC_TYPES, rec.scope.toMs, phases.stats);
phases.stop(GCPhase.SPLITS_CLEANUP);
}
}
@@ -312,14 +406,13 @@
private void collectDeletedDocuments(GCPhases phases,
RevisionVector headRevision,
- final long lastOldestTimeStamp,
- final long oldestRevTimeStamp)
- throws IOException {
+ Recommendations rec)
+ throws IOException, LimitExceededException {
int docsTraversed = 0;
- DeletedDocsGC gc = new DeletedDocsGC(headRevision, cancel);
+ DeletedDocsGC gc = new DeletedDocsGC(headRevision, cancel, options);
try {
- if (phases.start(GCPhase.COLLECTING)) {
- Iterable itr = versionStore.getPossiblyDeletedDocs(lastOldestTimeStamp, oldestRevTimeStamp);
+ if (phases.start(GCPhase.COLLECTING)) {
+ Iterable itr = versionStore.getPossiblyDeletedDocs(rec.scope.fromMs, rec.scope.toMs);
try {
for (NodeDocument doc : itr) {
// continue with GC?
@@ -331,7 +424,7 @@
// this node has not be revived again in past maxRevisionAge
// So deleting it is safe
docsTraversed++;
- if (docsTraversed % PROGRESS_BATCH_SIZE == 0){
+ if (docsTraversed % PROGRESS_BATCH_SIZE == 0) {
log.info("Iterated through {} documents so far. {} found to be deleted",
docsTraversed, gc.getNumDocuments());
}
@@ -339,6 +432,9 @@
gc.possiblyDeleted(doc);
phases.stop(GCPhase.CHECKING);
}
+ if (rec.maxCollect > 0 && gc.docIdsToDelete.getSize() > rec.maxCollect) {
+ throw new LimitExceededException();
+ }
if (gc.hasLeafBatch()) {
if (phases.start(GCPhase.DELETING)) {
gc.removeLeafDocuments(phases.stats);
@@ -385,25 +481,6 @@
}
}
- private long getLastOldestTimeStamp() {
- Document versionGCDoc = ds.find(Collection.SETTINGS, SETTINGS_COLLECTION_ID, 0);
- return (Long) versionGCDoc.get(SETTINGS_COLLECTION_OLDEST_TIMESTAMP_PROP);
- }
-
- private void setLastOldestTimeStamp(long lastGCRunTime) {
- UpdateOp updateOp = new UpdateOp(SETTINGS_COLLECTION_ID, false);
- updateOp.set(SETTINGS_COLLECTION_OLDEST_TIMESTAMP_PROP, lastGCRunTime);
- ds.createOrUpdate(Collection.SETTINGS, updateOp);
- }
-
- private void createSettingDocIfNotExist() {
- if (ds.find(Collection.SETTINGS, SETTINGS_COLLECTION_ID) == null) {
- UpdateOp updateOp = new UpdateOp(SETTINGS_COLLECTION_ID, true);
- updateOp.set(SETTINGS_COLLECTION_OLDEST_TIMESTAMP_PROP, 0);
- ds.create(Collection.SETTINGS, Lists.newArrayList(updateOp));
- }
- }
-
/**
* A helper class to remove document for deleted nodes.
*/
@@ -413,15 +490,22 @@
private final AtomicBoolean cancel;
private final List leafDocIdsToDelete = Lists.newArrayList();
private final List resurrectedIds = Lists.newArrayList();
- private final StringSort docIdsToDelete = newStringSort();
- private final StringSort prevDocIdsToDelete = newStringSort();
+ private final StringSort docIdsToDelete;
+ private final StringSort prevDocIdsToDelete;
private final Set exclude = Sets.newHashSet();
private boolean sorted = false;
+ private final Stopwatch timer;
+ private final VersionGCOptions options;
public DeletedDocsGC(@Nonnull RevisionVector headRevision,
- @Nonnull AtomicBoolean cancel) {
+ @Nonnull AtomicBoolean cancel,
+ @Nonnull VersionGCOptions options) {
this.headRevision = checkNotNull(headRevision);
this.cancel = checkNotNull(cancel);
+ this.timer = Stopwatch.createUnstarted();
+ this.options = options;
+ this.docIdsToDelete = newStringSort(options);
+ this.prevDocIdsToDelete = newStringSort(options);
}
/**
@@ -441,8 +525,9 @@
* them together with associated previous document
*
* @param doc the candidate document.
+ * @return true iff document is scheduled for deletion
*/
- void possiblyDeleted(NodeDocument doc)
+ boolean possiblyDeleted(NodeDocument doc)
throws IOException {
// construct an id that also contains
// the _modified time of the document
@@ -452,7 +537,7 @@
Utils.getDepthFromId(id);
} catch (IllegalArgumentException e) {
log.warn("Invalid GC id {} for document {}", id, doc);
- return;
+ return false;
}
if (doc.getNodeAtRevision(nodeStore, headRevision, null) == null) {
// Collect id of all previous docs also
@@ -463,9 +548,11 @@
addDocument(id);
addPreviousDocuments(previousDocs);
}
+ return true;
} else {
addNonDeletedDocument(id);
}
+ return false;
}
/**
@@ -521,6 +608,19 @@
//------------------------------< internal >----------------------------
+ private void delayOnModifications(long durationMs) {
+ long delayMs = Math.round(durationMs * options.delayFactor);
+ if (!cancel.get() && delayMs > 0) {
+ try {
+ Clock clock = nodeStore.getClock();
+ clock.waitUntil(clock.getTime() + delayMs);
+ }
+ catch (InterruptedException ex) {
+ /* ignore */
+ }
+ }
+ }
+
private Iterator previousDocIdsFor(NodeDocument doc) {
Map prevRanges = doc.getPreviousRanges(true);
if (prevRanges.isEmpty()) {
@@ -609,7 +709,8 @@
private int removeDeletedDocuments(Iterator docIdsToDelete,
long numDocuments,
- String label) throws IOException {
+ String label
+ ) throws IOException {
log.info("Proceeding to delete [{}] documents [{}]", numDocuments, label);
Iterator> idListItr = partition(docIdsToDelete, DELETE_BATCH_SIZE);
@@ -629,34 +730,39 @@
deletionBatch.put(parsed.getKey(), singletonMap(KEY_MODIFIED, newEqualsCondition(parsed.getValue())));
}
- if (log.isDebugEnabled()) {
+ if (log.isTraceEnabled()) {
StringBuilder sb = new StringBuilder("Performing batch deletion of documents with following ids. \n");
Joiner.on(LINE_SEPARATOR.value()).appendTo(sb, deletionBatch.keySet());
- log.debug(sb.toString());
+ log.trace(sb.toString());
}
- int nRemoved = ds.remove(NODES, deletionBatch);
+ timer.reset().start();
+ try {
+ int nRemoved = ds.remove(NODES, deletionBatch);
- if (nRemoved < deletionBatch.size()) {
- // some nodes were re-created while GC was running
- // find the document that still exist
- for (String id : deletionBatch.keySet()) {
- NodeDocument d = ds.find(NODES, id);
- if (d != null) {
- concurrentModification(d);
+ if (nRemoved < deletionBatch.size()) {
+ // some nodes were re-created while GC was running
+ // find the document that still exist
+ for (String id : deletionBatch.keySet()) {
+ NodeDocument d = ds.find(NODES, id);
+ if (d != null) {
+ concurrentModification(d);
+ }
}
+ recreatedCount += (deletionBatch.size() - nRemoved);
}
- recreatedCount += (deletionBatch.size() - nRemoved);
- }
- deletedCount += nRemoved;
- log.debug("Deleted [{}] documents so far", deletedCount);
+ deletedCount += nRemoved;
+ log.debug("Deleted [{}] documents so far", deletedCount);
- if (deletedCount + recreatedCount - lastLoggedCount >= PROGRESS_BATCH_SIZE){
- lastLoggedCount = deletedCount + recreatedCount;
- double progress = lastLoggedCount * 1.0 / getNumDocuments() * 100;
- String msg = String.format("Deleted %d (%1.2f%%) documents so far", deletedCount, progress);
- log.info(msg);
+ if (deletedCount + recreatedCount - lastLoggedCount >= PROGRESS_BATCH_SIZE) {
+ lastLoggedCount = deletedCount + recreatedCount;
+ double progress = lastLoggedCount * 1.0 / getNumDocuments() * 100;
+ String msg = String.format("Deleted %d (%1.2f%%) documents so far", deletedCount, progress);
+ log.info(msg);
+ }
+ } finally {
+ delayOnModifications(timer.stop().elapsed(TimeUnit.MILLISECONDS));
}
}
return deletedCount;
@@ -666,24 +772,30 @@
log.info("Proceeding to reset [{}] _deletedOnce flags", resurrectedDocuments.size());
int updateCount = 0;
- for (String s : resurrectedDocuments) {
- if (!cancel.get()) {
- try {
- Map.Entry parsed = parseEntry(s);
- UpdateOp up = new UpdateOp(parsed.getKey(), false);
- up.equals(MODIFIED_IN_SECS, parsed.getValue());
- up.remove(NodeDocument.DELETED_ONCE);
- NodeDocument r = ds.findAndUpdate(Collection.NODES, up);
- if (r != null) {
- updateCount += 1;
+ timer.reset().start();
+ try {
+ for (String s : resurrectedDocuments) {
+ if (!cancel.get()) {
+ try {
+ Map.Entry parsed = parseEntry(s);
+ UpdateOp up = new UpdateOp(parsed.getKey(), false);
+ up.equals(MODIFIED_IN_SECS, parsed.getValue());
+ up.remove(NodeDocument.DELETED_ONCE);
+ NodeDocument r = ds.findAndUpdate(Collection.NODES, up);
+ if (r != null) {
+ updateCount += 1;
+ }
+ } catch (IllegalArgumentException ex) {
+ log.warn("Invalid _modified suffix for {}", s);
+ } catch (DocumentStoreException ex) {
+ log.warn("updating {}: {}", s, ex.getMessage());
}
- } catch (IllegalArgumentException ex) {
- log.warn("Invalid _modified suffix for {}", s);
- } catch (DocumentStoreException ex) {
- log.warn("updating {}: {}", s, ex.getMessage());
}
}
}
+ finally {
+ delayOnModifications(timer.stop().elapsed(TimeUnit.MILLISECONDS));
+ }
return updateCount;
}
@@ -708,7 +820,7 @@
log.debug("Deleted [{}] previous documents so far", deletedCount);
- if (deletedCount - lastLoggedCount >= PROGRESS_BATCH_SIZE){
+ if (deletedCount - lastLoggedCount >= PROGRESS_BATCH_SIZE) {
lastLoggedCount = deletedCount;
double progress = deletedCount * 1.0 / (prevDocIdsToDelete.getSize() - exclude.size()) * 100;
String msg = String.format("Deleted %d (%1.2f%%) previous documents so far", deletedCount, progress);
@@ -751,9 +863,8 @@
}
@Nonnull
- private StringSort newStringSort() {
- return new StringSort(overflowToDiskThreshold,
- NodeDocumentIdComparator.INSTANCE);
+ private StringSort newStringSort(VersionGCOptions options) {
+ return new StringSort(options.overflowToDiskThreshold, NodeDocumentIdComparator.INSTANCE);
}
private static final Predicate FIRST_LEVEL = new Predicate() {
@@ -762,4 +873,234 @@
return input != null && input.height == 0;
}
};
+
+ private class Recommendations {
+ final boolean ignoreDueToCheckPoint;
+ final TimeInterval scope;
+ final long maxCollect;
+ final long deleteCandidateCount;
+ final long lastOldestTimestamp;
+
+ private final long precisionMs;
+ private final long suggestedIntervalMs;
+ private final boolean scopeIsComplete;
+
+ /**
+ * Gives a recommendation about parameters for the next revision garbage collection run.
+ *
+ * With the given maximum age of revisions to keep (earliest time in the past to collect),
+ * the desired precision in which times shall be sliced and the given limit on the number
+ * of collected documents in one run, calculate
+ * - if gc shall run at all (ignoreDueToCheckPoint)
+ * - in which time interval documents shall be collected (scope)
+ * - if collection should fail if it reaches maxCollect documents, maxCollect will specify
+ * the limit or be 0 if no limit shall be enforced.
+ *
+ * After a run, recommendations evaluate the result of the gc to update its persisted recommendations
+ * for future runs.
+ *
+ * In the settings collection, recommendations keeps "revisionsOlderThan" from the last successful run.
+ * It also updates the time interval recommended for the next run.
+ *
+ * @param maxRevisionAgeMs the minimum age for revisions to be collected
+ * @param options options for running the gc
+ */
+ Recommendations(long maxRevisionAgeMs, VersionGCOptions options) {
+ TimeInterval keep = new TimeInterval(nodeStore.getClock().getTime() - maxRevisionAgeMs, Long.MAX_VALUE);
+ boolean ignoreDueToCheckPoint = false;
+ long deletedOnceCount = 0;
+ long suggestedIntervalMs;
+ long oldestPossible;
+ long collectLimit = options.collectLimit;
+
+ lastOldestTimestamp = getLongSetting(SETTINGS_COLLECTION_OLDEST_TIMESTAMP_PROP);
+ if (lastOldestTimestamp == 0) {
+ log.debug("No lastOldestTimestamp found, querying for the oldest deletedOnce candidate");
+ oldestPossible = versionStore.getOldestDeletedOnceTimestamp(nodeStore.getClock(), options.precisionMs) - 1;
+ log.debug("lastOldestTimestamp found: {}", Utils.timestampToString(oldestPossible));
+ } else {
+ oldestPossible = lastOldestTimestamp - 1;
+ }
+
+ TimeInterval scope = new TimeInterval(oldestPossible, Long.MAX_VALUE);
+ scope = scope.notLaterThan(keep.fromMs);
+
+ suggestedIntervalMs = getLongSetting(SETTINGS_COLLECTION_REC_INTERVAL_PROP);
+ if (suggestedIntervalMs > 0) {
+ suggestedIntervalMs = Math.max(suggestedIntervalMs, options.precisionMs);
+ if (suggestedIntervalMs < scope.getDurationMs()) {
+ scope = scope.startAndDuration(suggestedIntervalMs);
+ log.debug("previous runs recommend a {} sec duration, scope now {}",
+ TimeUnit.MILLISECONDS.toSeconds(suggestedIntervalMs), scope);
+ }
+ } else {
+ /* Need to guess. Count the overall number of _deletedOnce documents. If those
+ * are more than we want to collect in a single run, reduce the time scope so
+ * that we likely see a fitting fraction of those documents.
+ */
+ try {
+ long preferredLimit = Math.min(collectLimit, (long)Math.ceil(options.overflowToDiskThreshold * 0.95));
+ deletedOnceCount = versionStore.getDeletedOnceCount();
+ if (deletedOnceCount > preferredLimit) {
+ double chunks = ((double) deletedOnceCount) / preferredLimit;
+ suggestedIntervalMs = (long) Math.floor((scope.getDurationMs() + maxRevisionAgeMs) / chunks);
+ if (suggestedIntervalMs < scope.getDurationMs()) {
+ scope = scope.startAndDuration(suggestedIntervalMs);
+ log.debug("deletedOnce candidates: {} found, {} preferred, scope now {}",
+ deletedOnceCount, preferredLimit, scope);
+ }
+ }
+ } catch (UnsupportedOperationException ex) {
+ log.debug("check on upper bounds of delete candidates not supported, skipped");
+ }
+ }
+
+ //Check for any registered checkpoint which prevent the GC from running
+ Revision checkpoint = nodeStore.getCheckpoints().getOldestRevisionToKeep();
+ if (checkpoint != null && scope.endsAfter(checkpoint.getTimestamp())) {
+ TimeInterval minimalScope = scope.startAndDuration(options.precisionMs);
+ if (minimalScope.endsAfter(checkpoint.getTimestamp())) {
+ log.warn("Ignoring RGC run because a valid checkpoint [{}] exists inside minimal scope {}.",
+ checkpoint.toReadableString(), minimalScope);
+ ignoreDueToCheckPoint = true;
+ } else {
+ scope = scope.notLaterThan(checkpoint.getTimestamp() - 1);
+ log.debug("checkpoint at [{}] found, scope now {}",
+ Utils.timestampToString(checkpoint.getTimestamp()), scope);
+ }
+ }
+
+ if (scope.getDurationMs() <= options.precisionMs) {
+ // If we have narrowed the collect time interval down as much as we can, no
+ // longer enforce a limit. We need to get through this.
+ collectLimit = 0;
+ log.debug("time interval <= precision ({} ms), disabling collection limits", options.precisionMs);
+ }
+
+ this.precisionMs = options.precisionMs;
+ this.ignoreDueToCheckPoint = ignoreDueToCheckPoint;
+ this.scope = scope;
+ this.scopeIsComplete = scope.toMs >= keep.fromMs;
+ this.maxCollect = collectLimit;
+ this.suggestedIntervalMs = suggestedIntervalMs;
+ this.deleteCandidateCount = deletedOnceCount;
+ }
+
+ /**
+ * Evaluate the results of the last run. Update recommendations for future runs.
+ * Will set {@link VersionGCStats#needRepeat} if collection needs to run another
+ * iteration for collecting documents up to "now".
+ *
+ * @param stats the statistics from the last run
+ */
+ public void evaluate(VersionGCStats stats) {
+ if (stats.limitExceeded) {
+ // if the limit was exceeded, slash the recommended interval in half.
+ long nextDuration = Math.max(precisionMs, scope.getDurationMs() / 2);
+ log.info("Limit {} documents exceeded, reducing next collection interval to {} seconds",
+ this.maxCollect, TimeUnit.MILLISECONDS.toSeconds(nextDuration));
+ setLongSetting(SETTINGS_COLLECTION_REC_INTERVAL_PROP, nextDuration);
+ stats.needRepeat = true;
+ } else if (!stats.canceled && !stats.ignoredGCDueToCheckPoint) {
+ // success, we would not expect to encounter revisions older than this in the future
+ setLongSetting(SETTINGS_COLLECTION_OLDEST_TIMESTAMP_PROP, scope.toMs);
+
+ if (maxCollect <= 0) {
+ log.debug("successful run without effective limit, keeping recommendations");
+ } else if (scope.getDurationMs() == suggestedIntervalMs) {
+ int count = stats.deletedDocGCCount - stats.deletedLeafDocGCCount;
+ double used = count / (double) maxCollect;
+ if (used < 0.66) {
+ long nextDuration = (long) Math.ceil(suggestedIntervalMs * 1.5);
+ log.debug("successful run using {}% of limit, raising recommended interval to {} seconds",
+ Math.round(used*1000)/10.0, TimeUnit.MILLISECONDS.toSeconds(nextDuration));
+ setLongSetting(SETTINGS_COLLECTION_REC_INTERVAL_PROP, nextDuration);
+ }
+ } else {
+ log.debug("successful run not following recommendations, keeping them");
+ }
+ stats.needRepeat = !scopeIsComplete;
+ }
+ }
+
+ private long getLongSetting(String propName) {
+ Document versionGCDoc = ds.find(Collection.SETTINGS, SETTINGS_COLLECTION_ID, 0);
+ if (versionGCDoc != null) {
+ Long l = (Long) versionGCDoc.get(propName);
+ if (l != null) {
+ return l;
+ }
+ }
+ return 0;
+ }
+
+ private void setLongSetting(String propName, long val) {
+ UpdateOp updateOp = new UpdateOp(SETTINGS_COLLECTION_ID,
+ (ds.find(Collection.SETTINGS, SETTINGS_COLLECTION_ID) == null));
+ updateOp.set(propName, val);
+ ds.createOrUpdate(Collection.SETTINGS, updateOp);
+ }
+ }
+
+ public static class TimeInterval {
+ public final long fromMs;
+ public final long toMs;
+
+ public TimeInterval(long fromMs, long toMs) {
+ checkArgument(fromMs <= toMs);
+ this.fromMs = fromMs;
+ this.toMs = toMs;
+ }
+
+ public TimeInterval notLaterThan(long timestampMs) {
+ if (timestampMs < toMs) {
+ return new TimeInterval((timestampMs < fromMs)? timestampMs : fromMs, timestampMs);
+ }
+ return this;
+ }
+
+ public TimeInterval notEarlierThan(long timestampMs) {
+ if (fromMs < timestampMs) {
+ return new TimeInterval(timestampMs, (timestampMs < toMs)? timestampMs : toMs);
+ }
+ return this;
+ }
+
+ public TimeInterval startAndDuration(long durationMs) {
+ return new TimeInterval(fromMs, fromMs + durationMs);
+ }
+
+ public long getDurationMs() {
+ return toMs - fromMs;
+ }
+
+ public boolean contains(long timestampMs) {
+ return fromMs <= timestampMs && timestampMs <= toMs;
+ }
+
+ public boolean endsAfter(long timestampMs) {
+ return toMs > timestampMs;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof TimeInterval) {
+ return fromMs == ((TimeInterval) o).fromMs && toMs == ((TimeInterval) o).toMs;
+ }
+ return super.equals(o);
+ }
+
+ @Override
+ public int hashCode() {
+ return (int)(fromMs^(fromMs>>>32)^toMs^(toMs>>>32));
+ }
+
+ @Override
+ public String toString() {
+ return "[" + Utils.timestampToString(fromMs) + ", " + Utils.timestampToString(toMs) + "]";
+ }
+ }
+
+ private static final class LimitExceededException extends Exception {
+ }
}
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoVersionGCSupport.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoVersionGCSupport.java (revision 1789523)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoVersionGCSupport.java (working copy)
@@ -19,6 +19,8 @@
package org.apache.jackrabbit.oak.plugins.document.mongo;
+import com.mongodb.client.model.DBCollectionCountOptions;
+import java.util.Iterator;
import java.util.List;
import java.util.Set;
@@ -33,6 +35,7 @@
import com.mongodb.DBObject;
import com.mongodb.QueryBuilder;
import com.mongodb.ReadPreference;
+import java.util.concurrent.TimeUnit;
import org.apache.jackrabbit.oak.plugins.document.Document;
import org.apache.jackrabbit.oak.plugins.document.NodeDocument;
import org.apache.jackrabbit.oak.plugins.document.SplitDocumentCleanUp;
@@ -39,6 +42,8 @@
import org.apache.jackrabbit.oak.plugins.document.VersionGCSupport;
import org.apache.jackrabbit.oak.plugins.document.VersionGarbageCollector.VersionGCStats;
import org.apache.jackrabbit.oak.plugins.document.util.CloseableIterable;
+import org.apache.jackrabbit.oak.plugins.document.util.Utils;
+import org.apache.jackrabbit.oak.stats.Clock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -87,6 +92,14 @@
}
@Override
+ public long getDeletedOnceCount() {
+ DBObject query = start(NodeDocument.DELETED_ONCE).is(Boolean.TRUE).get();
+ DBCollectionCountOptions options = new DBCollectionCountOptions();
+ options.readPreference(ReadPreference.secondaryPreferred());
+ return getNodeCollection().count(query, options);
+ }
+
+ @Override
protected SplitDocumentCleanUp createCleanUp(Set gcTypes,
long oldestRevTimeStamp,
VersionGCStats stats) {
@@ -105,6 +118,35 @@
});
}
+ @Override
+ public long getOldestDeletedOnceTimestamp(Clock clock, long precisionMs) {
+ LOG.debug("getOldestDeletedOnceTimestamp() <- start");
+ DBObject query = start(NodeDocument.DELETED_ONCE).is(Boolean.TRUE).get();
+ DBCursor cursor = getNodeCollection().find(query).sort(start(NodeDocument.MODIFIED_IN_SECS).is(1).get()).limit(1);
+ CloseableIterable results = CloseableIterable.wrap(transform(cursor, new Function() {
+ @Override
+ public NodeDocument apply(DBObject input) {
+ return store.convertFromDBObject(NODES, input);
+ }
+ }), cursor);
+ try {
+ Iterator i = results.iterator();
+ if (i.hasNext()) {
+ NodeDocument doc = i.next();
+ long modifiedMs = doc.getModified() * TimeUnit.SECONDS.toMillis(1);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("getOldestDeletedOnceTimestamp() -> {}", Utils.timestampToString(modifiedMs));
+ }
+ return modifiedMs;
+ }
+ }
+ finally {
+ Utils.closeIfCloseable(results);
+ }
+ LOG.debug("getOldestDeletedOnceTimestamp() -> none found, return current time");
+ return clock.getTime();
+ }
+
private DBObject createQuery(Set gcTypes,
long oldestRevTimeStamp) {
//OR condition has to be first as we have a index for that
Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGCDeletionTest.java
===================================================================
--- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGCDeletionTest.java (revision 1789523)
+++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGCDeletionTest.java (working copy)
@@ -130,6 +130,7 @@
.setDocumentStore(ts)
.setAsyncDelay(0)
.getNodeStore();
+ Revision.setClock(clock);
//Baseline the clock
clock.waitUntil(Revision.getCurrentTimestamp());
@@ -205,7 +206,7 @@
//3. Check that deleted doc does get collected post maxAge
clock.waitUntil(clock.getTime() + HOURS.toMillis(maxAge*2) + delta);
VersionGarbageCollector gc = store.getVersionGarbageCollector();
- gc.setOverflowToDiskThreshold(100);
+ gc.setOptions(gc.getOptions().withOverflowToDiskThreshold(100));
VersionGCStats stats = gc.gc(maxAge * 2, HOURS);
assertEquals(noOfDocsToDelete * 2 + 1, stats.deletedDocGCCount);
@@ -254,7 +255,7 @@
//3. Check that deleted doc does get collected post maxAge
clock.waitUntil(clock.getTime() + HOURS.toMillis(maxAge*2) + delta);
VersionGarbageCollector gc = store.getVersionGarbageCollector();
- gc.setOverflowToDiskThreshold(100);
+ gc.setOptions(gc.getOptions().withOverflowToDiskThreshold(100));
VersionGCStats stats = gc.gc(maxAge * 2, HOURS);
assertEquals(noOfDocsToDelete * 2 + 1, stats.deletedDocGCCount);
@@ -403,6 +404,170 @@
assertEquals(expected, names);
}
+ @Test
+ public void deleteLimits() throws Exception{
+ int noOfDocsToDelete = 1000;
+ DocumentStore ts = new MemoryDocumentStore();
+ store = new DocumentMK.Builder()
+ .clock(clock)
+ .setDocumentStore(new MemoryDocumentStore())
+ .setAsyncDelay(0)
+ .getNodeStore();
+
+ //Baseline the clock
+ clock.waitUntil(Revision.getCurrentTimestamp());
+
+ NodeBuilder b1 = store.getRoot().builder();
+ NodeBuilder xb = b1.child("x");
+ for (int i = 0; i < noOfDocsToDelete; i++){
+ xb.child("a"+i).child("b"+i);
+ }
+ store.merge(b1, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+ long maxAge = 1; //hours
+ long delta = TimeUnit.MINUTES.toMillis(10);
+
+ NodeBuilder b2 = store.getRoot().builder();
+ b2.child("x").remove();
+ store.merge(b2, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+ store.runBackgroundOperations();
+
+ clock.waitUntil(clock.getTime() + HOURS.toMillis(maxAge*2) + delta);
+ VersionGarbageCollector gc = store.getVersionGarbageCollector();
+ // The first attempt will hit the collection limit and fail.
+ // However repeated attempts will finally succeed and cleanup everything
+ gc.setOptions(gc.getOptions().withCollectLimit(100).withMaxIterations(1));
+
+ VersionGCStats stats = gc.gc(maxAge * 2, HOURS);
+ assertEquals(stats.limitExceeded, true);
+
+ gc.setOptions(gc.getOptions().withMaxIterations(0).withMaxDuration(TimeUnit.MINUTES, 5));
+
+ stats = gc.gc(maxAge * 2, HOURS);
+ assertEquals(stats.canceled, false);
+ assertEquals(noOfDocsToDelete * 2 + 1, stats.deletedDocGCCount);
+ assertEquals(noOfDocsToDelete, stats.deletedLeafDocGCCount);
+
+ assertNull(ts.find(Collection.NODES, "1:/x"));
+ for (int i = 0; i < noOfDocsToDelete; i++){
+ assertNull(ts.find(Collection.NODES, "2:/a"+i+"/b"+i));
+ assertNull(ts.find(Collection.NODES, "1:/a"+i));
+ }
+ }
+
+ @Test
+ public void deleteLimitChunks() throws Exception{
+ int noOfDocsInChunk = 100;
+ int noOfChunks = 10;
+ DocumentStore ts = new MemoryDocumentStore();
+ store = new DocumentMK.Builder()
+ .clock(clock)
+ .setDocumentStore(new MemoryDocumentStore())
+ .setAsyncDelay(0)
+ .getNodeStore();
+ Revision.setClock(clock);
+
+ //Baseline the clock
+ clock.waitUntil(Revision.getCurrentTimestamp());
+
+ for (int i = 0; i < noOfChunks; ++i) {
+ NodeBuilder b1 = store.getRoot().builder();
+ NodeBuilder xb = b1.child("x"+i);
+ for (int j = 0; j < noOfDocsInChunk; ++j) {
+ xb.child("a"+j).child("b"+j);
+ }
+ store.merge(b1, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+ clock.waitUntil(Revision.getCurrentTimestamp() + HOURS.toMillis(1));
+ }
+
+ long maxAge = 2; //hours
+
+ for (int i = 0; i < noOfChunks; ++i) {
+ NodeBuilder b2 = store.getRoot().builder();
+ b2.child("x"+i).remove();
+ merge(store, b2);
+ clock.waitUntil(clock.getTime() + HOURS.toMillis(1));
+ }
+
+ clock.waitUntil(Revision.getCurrentTimestamp() + HOURS.toMillis(maxAge));
+ store.runBackgroundOperations();
+
+ VersionGarbageCollector gc = store.getVersionGarbageCollector();
+ gc.setOptions(gc.getOptions().withCollectLimit(101).withMaxIterations(1));
+
+ // The first attempt will hit the collection limit and fail.
+ // However repeated attempts will finally succeed and cleanup a chunk of docs
+ VersionGCStats stats = gc.gc(maxAge, HOURS);
+ assertEquals(stats.limitExceeded, true);
+ for (int i = 0; i < noOfChunks; /**/) {
+ stats = gc.gc(maxAge, HOURS);
+ if (stats.deletedLeafDocGCCount > 0) {
+ assertEquals(stats.canceled, false);
+ assertEquals(noOfDocsInChunk * 2 + 1, stats.deletedDocGCCount);
+ assertEquals(noOfDocsInChunk, stats.deletedLeafDocGCCount);
+ ++i;
+ }
+ }
+
+ for (int i = 0; i < noOfChunks; ++i) {
+ assertNull(ts.find(Collection.NODES, "1:/x"+i));
+ }
+ for (int i = 0; i < noOfDocsInChunk; ++i) {
+ assertNull(ts.find(Collection.NODES, "2:/a" + i + "/b" + i));
+ assertNull(ts.find(Collection.NODES, "1:/a" + i));
+ }
+ }
+
+ @Test
+ public void deleteChunksInIterations() throws Exception{
+ int noOfDocsInChunk = 100;
+ int noOfChunks = 10;
+ store = new DocumentMK.Builder()
+ .clock(clock)
+ .setDocumentStore(new MemoryDocumentStore())
+ .setAsyncDelay(0)
+ .getNodeStore();
+ Revision.setClock(clock);
+
+ //Baseline the clock
+ clock.waitUntil(Revision.getCurrentTimestamp());
+
+ for (int i = 0; i < noOfChunks; ++i) {
+ NodeBuilder b1 = store.getRoot().builder();
+ NodeBuilder xb = b1.child("x"+i);
+ for (int j = 0; j < noOfDocsInChunk; ++j){
+ xb.child("a"+j).child("b"+j);
+ }
+ store.merge(b1, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+ clock.waitUntil(Revision.getCurrentTimestamp() + HOURS.toMillis(1));
+ }
+
+ long maxAge = 2; //hours
+
+ for (int i = 0; i < noOfChunks; ++i) {
+ NodeBuilder b2 = store.getRoot().builder();
+ b2.child("x"+i).remove();
+ merge(store, b2);
+ clock.waitUntil(clock.getTime() + HOURS.toMillis(1));
+ }
+
+ clock.waitUntil(Revision.getCurrentTimestamp() + HOURS.toMillis(maxAge));
+ store.runBackgroundOperations();
+
+ VersionGarbageCollector gc = store.getVersionGarbageCollector();
+ // limit chunk size and allow unlimited iterations
+ gc.setOptions(gc.getOptions().withCollectLimit(101).withMaxIterations(0));
+ VersionGCStats stats = gc.gc(maxAge, HOURS);
+
+ // All should be cleaned up now in > nOfChunks iterations
+ assertEquals(stats.limitExceeded, false);
+ assertEquals(stats.canceled, false);
+ assertEquals((noOfDocsInChunk * 2 + 1) * noOfChunks, stats.deletedDocGCCount);
+ assertEquals(noOfDocsInChunk * noOfChunks, stats.deletedLeafDocGCCount);
+ assertTrue(stats.iterationCount > noOfChunks);
+ }
+
private void merge(DocumentNodeStore store, NodeBuilder builder)
throws CommitFailedException {
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGCInitTest.java
===================================================================
--- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGCInitTest.java (revision 1789523)
+++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGCInitTest.java (working copy)
@@ -25,6 +25,8 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import java.util.concurrent.TimeUnit;
+
public class VersionGCInitTest {
@Rule
@@ -43,7 +45,7 @@
Document vgc = store.find(Collection.SETTINGS, "versionGC");
assertNull(vgc);
- ns.getVersionGarbageCollector();
+ ns.getVersionGarbageCollector().gc(1, TimeUnit.DAYS);
vgc = store.find(Collection.SETTINGS, "versionGC");
assertNotNull(vgc);