diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentStore.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentStore.java index 6d47a32..8c51949 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentStore.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentStore.java @@ -202,6 +202,34 @@ public interface DocumentStore { Map> toRemove) throws DocumentStoreException; + + /** + * Batch remove documents where the given "indexed property" is within the given + * range (inclusive) - {@code [startValue, endValue]}. + *

+ * The indexed property is a {@link Long} value and numeric comparison applies. + *

+ * In case of a {@code DocumentStoreException}, the documents with the given + * keys may or may not have been removed from the store. It may also be + * possible that only some have been removed from the store. It is the + * responsibility of the caller to check which documents still exist. The + * implementation however ensures that the result of the operation is + * properly reflected in the document cache. That is, an implementation + * could simply evict documents with the given keys from the cache. + * + * @param the document type + * @param collection the collection. + * @param indexedProperty the name of the indexed property + * @param startValue the minimum value of the indexed property + * @param endValue the maximum value of the indexed property + * @return the number of removed documents. + * @throws DocumentStoreException if the operation failed. E.g. because of + * an I/O error. + */ + int remove(Collection collection, + String indexedProperty, long startValue, long endValue) + throws DocumentStoreException; + /** * Try to create a list of documents. This method returns {@code true} iff * none of the documents existed before and the create was successful. This diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalGarbageCollector.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalGarbageCollector.java index 4f09eb8..6135c86 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalGarbageCollector.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalGarbageCollector.java @@ -90,68 +90,11 @@ public class JournalGarbageCollector { } Stopwatch sw = Stopwatch.createStarted(); - // the journal has ids of the following format: - // 1-0000014db9aaf710-00000001 - // whereas the first number is the cluster node id. - // now, this format prevents from doing a generic - // query to get all 'old' entries, as the documentstore - // can only query for a sequential list of entries. - // (and the cluster node id here partitions the set - // of entries that we have to delete) - // To account for that, we simply iterate over all - // cluster node ids and clean them up individually. - // Note that there are possible alternatives, such - // as: let each node clean up its own old entries - // but the chosen path is also quite simple: it can - // be started on any instance - but best on only one. - // if it's run on multiple concurrently, then they - // will compete at deletion, which is not optimal - // due to performance, but does not harm. - // update the tail timestamp in the journalGC document // of the settings collection updateTailTimestamp(gcOlderThan); - // 1. get the list of cluster node ids - final List clusterNodeInfos = ClusterNodeInfoDocument.all(ds); - int numDeleted = 0; - for (ClusterNodeInfoDocument clusterNodeInfoDocument : clusterNodeInfos) { - // current algorithm is to simply look at all cluster nodes - // irrespective of whether they are active or inactive etc. - // this could be optimized for inactive ones: at some point, all - // journal entries of inactive ones would have been cleaned up - // and at that point we could stop including those long-time-inactive ones. - // that 'long time' aspect would have to be tracked though, to be sure - // we don't leave garbage. - // so simpler is to quickly do a query even for long-time inactive ones - final int clusterNodeId = clusterNodeInfoDocument.getClusterId(); - - // 2. iterate over that list and do a query with - // a limit of 'batch size' - boolean branch = false; - long startPointer = 0; - while (true) { - String fromKey = JournalEntry.asId(new Revision(startPointer, 0, clusterNodeId, branch)); - String toKey = JournalEntry.asId(new Revision(gcOlderThan, 0, clusterNodeId, branch)); - List deletionBatch = ds.query(Collection.JOURNAL, fromKey, toKey, batchSize); - if (deletionBatch.size() > 0) { - ds.remove(Collection.JOURNAL, asKeys(deletionBatch)); - numDeleted += deletionBatch.size(); - } - if (deletionBatch.size() < batchSize) { - if (!branch) { - // do the same for branches: - // this will start at the beginning again with branch set to true - // and eventually finish too - startPointer = 0; - branch = true; - continue; - } - break; - } - startPointer = deletionBatch.get(deletionBatch.size() - 1).getRevisionTimestamp(); - } - } + int numDeleted = ds.remove(Collection.JOURNAL, JournalEntry.MODIFIED, 0, gcOlderThan); sw.stop(); diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/memory/MemoryDocumentStore.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/memory/MemoryDocumentStore.java index 1900d08..240d426 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/memory/MemoryDocumentStore.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/memory/MemoryDocumentStore.java @@ -27,8 +27,11 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.annotation.CheckForNull; import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import com.google.common.base.Predicate; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; import org.apache.jackrabbit.oak.cache.CacheStats; import org.apache.jackrabbit.oak.plugins.document.Collection; import org.apache.jackrabbit.oak.plugins.document.Document; @@ -44,6 +47,7 @@ import com.google.common.base.Splitter; import com.mongodb.ReadPreference; import com.mongodb.WriteConcern; import org.apache.jackrabbit.oak.plugins.document.cache.CacheInvalidationStats; +import org.apache.jackrabbit.oak.plugins.document.util.Utils; import static org.apache.jackrabbit.oak.plugins.document.UpdateUtils.assertUnconditional; import static org.apache.jackrabbit.oak.plugins.document.UpdateUtils.checkConditions; @@ -202,6 +206,31 @@ public class MemoryDocumentStore implements DocumentStore { return num; } + @Override + public int remove(Collection collection, + final String indexedProperty, final long startValue, final long endValue) + throws DocumentStoreException { + ConcurrentSkipListMap map = getMap(collection); + int num = map.size(); + + Lock lock = rwLock.writeLock(); + lock.lock(); + try { + Maps.filterValues(map, new Predicate() { + @Override + public boolean apply(@Nullable T doc) { + Long modified = Utils.asLong((Number) doc.get(indexedProperty)); + return startValue <= modified && modified <= endValue; + } + }).clear(); + } finally { + lock.unlock(); + } + + num -= map.size(); + return num; + } + @CheckForNull @Override public T createOrUpdate(Collection collection, UpdateOp update) { diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java index cf55355..c3913a0 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java @@ -764,6 +764,39 @@ public class MongoDocumentStore implements DocumentStore, RevisionListener { return num; } + @Override + public int remove(Collection collection, + String indexedProperty, long startValue, long endValue) + throws DocumentStoreException { + log("remove", collection, indexedProperty, startValue, endValue); + int num = 0; + DBCollection dbCollection = getDBCollection(collection); + long start = PERFLOG.start(); + try { + QueryBuilder queryBuilder = QueryBuilder.start(indexedProperty); + queryBuilder.greaterThanEquals(startValue); + queryBuilder.lessThanEquals(endValue); + try { + num = dbCollection.remove(queryBuilder.get()).getN(); + } catch (Exception e) { + throw DocumentStoreException.convert(e, "Remove failed for " + collection + ": " + + indexedProperty + " in [" + startValue + ", " + endValue + "]"); + } finally { + if (num > 0 && collection == Collection.NODES) { + // this method is currently being used only for Journal collection while GC. + // But, to keep sanctity of the API, we need to acknowledge that Nodes collection + // could've been used. But, in this signature, there's no useful way to invalidate + // cache. + // So, we use the hammer for this task + invalidateCache(); + } + } + } finally { + PERFLOG.end(start, 1, "remove from {}: {} in [{}, {}]", collection, indexedProperty, startValue, endValue); + } + return num; + } + @SuppressWarnings("unchecked") @CheckForNull private T findAndModify(Collection collection, diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java index ba037c3..7c891c3 100755 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java @@ -279,6 +279,27 @@ public class RDBDocumentStore implements DocumentStore { } @Override + public int remove(Collection collection, + String indexedProperty, long startValue, long endValue) + throws DocumentStoreException { + int num = 0; + try { + num = delete(collection, indexedProperty, startValue, endValue); + } finally { + if (num > 0 && collection == Collection.NODES) { + // this method is currently being used only for Journal collection while GC. + // But, to keep sanctity of the API, we need to acknowledge that Nodes collection + // could've been used. But, in this signature, there's no useful way to invalidate + // cache. + // So, we use the hammer for this task + invalidateCache(); + } + } + return num; + } + + + @Override public boolean create(Collection collection, List updateOps) { return internalCreate(collection, updateOps); } @@ -1566,6 +1587,23 @@ public class RDBDocumentStore implements DocumentStore { return numDeleted; } + private int delete(Collection collection, + String indexedProperty, long startVal, long endVal) { + int numDeleted = 0; + RDBTableMetaData tmd = getTable(collection); + Connection connection = null; + try { + connection = this.ch.getRWConnection(); + numDeleted = db.delete(connection, tmd, indexedProperty, startVal, endVal); + connection.commit(); + } catch (Exception ex) { + throw DocumentStoreException.convert(ex, "deleting " + collection + ": " + indexedProperty + " in [" + startVal + ", " + endVal + "]"); + } finally { + this.ch.closeConnection(connection); + } + return numDeleted; + } + private boolean updateDocument(@Nonnull Collection collection, @Nonnull T document, @Nonnull UpdateOp update, Long oldmodcount) { Connection connection = null; diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStoreJDBC.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStoreJDBC.java index 51e1342..cb97a3e 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStoreJDBC.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStoreJDBC.java @@ -225,6 +225,23 @@ public class RDBDocumentStoreJDBC { } } + public int delete(Connection connection, RDBTableMetaData tmd, String property, long startVal, long endVal) + throws SQLException, DocumentStoreException { + if (!MODIFIED.equals(property)) { + throw new DocumentStoreException("Unsupported condition: " + property + " in [" + startVal + ", " + endVal + "]"); + } + PreparedStatement stmt = connection.prepareStatement("delete from " + tmd.getName() + + " where MODIFIED >= ? AND MODIFIED <= ?"); + try { + int i = 1; + stmt.setLong(i++, startVal); + stmt.setLong(i++, endVal); + return stmt.executeUpdate(); + } finally { + stmt.close(); + } + } + public long determineServerTimeDifferenceMillis(Connection connection) { String sql = this.dbInfo.getCurrentTimeStampInSecondsSyntax(); diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LeaseCheckDocumentStoreWrapper.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LeaseCheckDocumentStoreWrapper.java index d56a1e0..d43c7e6 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LeaseCheckDocumentStoreWrapper.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LeaseCheckDocumentStoreWrapper.java @@ -26,6 +26,7 @@ import org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo; import org.apache.jackrabbit.oak.plugins.document.Collection; import org.apache.jackrabbit.oak.plugins.document.Document; import org.apache.jackrabbit.oak.plugins.document.DocumentStore; +import org.apache.jackrabbit.oak.plugins.document.DocumentStoreException; import org.apache.jackrabbit.oak.plugins.document.RevisionListener; import org.apache.jackrabbit.oak.plugins.document.RevisionVector; import org.apache.jackrabbit.oak.plugins.document.UpdateOp; @@ -108,6 +109,14 @@ public final class LeaseCheckDocumentStoreWrapper implements DocumentStore, Revi } @Override + public int remove(Collection collection, + String indexedProperty, long startValue, long endValue) + throws DocumentStoreException { + performLeaseCheck(); + return delegate.remove(collection, indexedProperty, startValue, endValue); + } + + @Override public final boolean create(Collection collection, List updateOps) { performLeaseCheck(); diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LoggingDocumentStoreWrapper.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LoggingDocumentStoreWrapper.java index 7d0e903..5550b7f 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LoggingDocumentStoreWrapper.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LoggingDocumentStoreWrapper.java @@ -173,6 +173,24 @@ public class LoggingDocumentStoreWrapper implements DocumentStore, RevisionListe } @Override + public int remove(final Collection collection, + final String indexedProperty, final long startValue, final long endValue) + throws DocumentStoreException { + try { + logMethod("remove", collection, indexedProperty, startValue, endValue); + return logResult(new Callable() { + @Override + public Integer call() throws Exception { + return store.remove(collection, indexedProperty, startValue, endValue); + } + }); + } catch (Exception e) { + logException(e); + throw convert(e); + } + } + + @Override public boolean create(final Collection collection, final List updateOps) { try { diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/SynchronizingDocumentStoreWrapper.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/SynchronizingDocumentStoreWrapper.java index cf78ce9..73d3e68 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/SynchronizingDocumentStoreWrapper.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/SynchronizingDocumentStoreWrapper.java @@ -25,6 +25,7 @@ import org.apache.jackrabbit.oak.cache.CacheStats; import org.apache.jackrabbit.oak.plugins.document.Collection; import org.apache.jackrabbit.oak.plugins.document.Document; import org.apache.jackrabbit.oak.plugins.document.DocumentStore; +import org.apache.jackrabbit.oak.plugins.document.DocumentStoreException; import org.apache.jackrabbit.oak.plugins.document.RevisionListener; import org.apache.jackrabbit.oak.plugins.document.RevisionVector; import org.apache.jackrabbit.oak.plugins.document.UpdateOp; @@ -83,6 +84,13 @@ public class SynchronizingDocumentStoreWrapper implements DocumentStore, Revisio } @Override + public int remove(Collection collection, + String indexedProperty, long startValue, long endValue) + throws DocumentStoreException { + return store.remove(collection, indexedProperty, startValue, endValue); + } + + @Override public synchronized boolean create(final Collection collection, final List updateOps) { return store.create(collection, updateOps); } diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java index 8c55e30..9d1f538 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java @@ -212,6 +212,25 @@ public class TimingDocumentStoreWrapper implements DocumentStore, RevisionListen } @Override + public int remove(Collection collection, + String indexedProperty, long startValue, long endValue) + throws DocumentStoreException { + try { + long start = now(); + int result = base.remove(collection, indexedProperty, startValue, endValue); + updateAndLogTimes("remove", start, 0, 0); + if (logCommonCall()) { + logCommonCall(start, "remove " + collection + "; indexedProperty" + indexedProperty + + "; range - [" + startValue + ", " + endValue + "]"); + } + return result; + } catch (Exception e) { + throw convert(e); + } + } + + + @Override public boolean create(Collection collection, List updateOps) { try { long start = now(); diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java index c05eccf..d21b468 100644 --- a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java +++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java @@ -41,8 +41,11 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.ContiguousSet; +import com.google.common.collect.DiscreteDomain; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Range; public class BasicDocumentStoreTest extends AbstractDocumentStoreTest { @@ -357,6 +360,50 @@ public class BasicDocumentStoreTest extends AbstractDocumentStoreTest { } } + //OAK-3001 + @Test + public void testRangeRemove() { + String idPrefix = this.getClass().getName() + ".testRangeRemove"; + + com.google.common.collect.Range modTimes = Range.closed(1L, 30L); + for (Long modTime : ContiguousSet.create(modTimes, DiscreteDomain.longs())) { + String id = idPrefix + modTime; + // remove if present + Document d = super.ds.find(Collection.JOURNAL, id); + if (d != null) { + super.ds.remove(Collection.JOURNAL, id); + } + + // add + UpdateOp up = new UpdateOp(id, true); + up.set("_id", id); + up.set("_modified", modTime); + super.ds.create(Collection.JOURNAL, Collections.singletonList(up)); + } + + assertEquals("Number of entries removed didn't match", 5, + ds.remove(Collection.JOURNAL, "_modified", 20, 24)); + + assertEquals("Number of entries removed didn't match", 0, + ds.remove(Collection.JOURNAL, "_modified", 20, 24)); + + assertEquals("Number of entries removed didn't match", 5, + ds.remove(Collection.JOURNAL, "_modified", -1, 5)); + + assertEquals("Number of entries removed didn't match", 5, + ds.remove(Collection.JOURNAL, "_modified", 0, 10)); + + // interesting cases + assertEquals("Number of entries removed didn't match", 0, + ds.remove(Collection.JOURNAL, "_modified", 20, 19)); + + assertEquals("Number of entries removed didn't match", 0, + ds.remove(Collection.JOURNAL, "_modified", 31, 40)); + + assertEquals("Number of entries removed didn't match", 3, + ds.remove(Collection.JOURNAL, "_modified", 28, 40)); + } + private int testMaxId(boolean ascii) { int min = 0; int max = 32768; diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java index 685e964..39ab41e 100644 --- a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java +++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java @@ -149,6 +149,14 @@ public class CountingDocumentStore implements DocumentStore, RevisionListener { } @Override + public int remove(Collection collection, + String indexedProperty, long startValue, long endValue) + throws DocumentStoreException { + getStats(collection).numRemoveCalls++; + return delegate.remove(collection, indexedProperty, startValue, endValue); + } + + @Override public boolean create(Collection collection, List updateOps) { getStats(collection).numCreateOrUpdateCalls++; diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStoreWrapper.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStoreWrapper.java index c3198c1..cf58f79 100644 --- a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStoreWrapper.java +++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStoreWrapper.java @@ -88,6 +88,13 @@ public class DocumentStoreWrapper implements DocumentStore, RevisionListener { } @Override + public int remove(Collection collection, + String indexedProperty, long startValue, long endValue) + throws DocumentStoreException { + return store.remove(collection, indexedProperty, startValue, endValue); + } + + @Override public boolean create(Collection collection, List updateOps) { return store.create(collection, updateOps);