diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UpdateUtils.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UpdateUtils.java index aa99f60..501066b 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UpdateUtils.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UpdateUtils.java @@ -24,6 +24,7 @@ import java.util.Map.Entry; import javax.annotation.Nonnull; +import com.google.common.base.Function; import com.google.common.base.Objects; import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Condition; @@ -37,6 +38,16 @@ import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Operation; public class UpdateUtils { /** + * Function that transforms the UpdateOp into its id. + */ + public static final Function GET_ID = new Function() { + @Override + public String apply(UpdateOp input) { + return input.getId(); + } + }; + + /** * Apply the changes to the in-memory document. * * @param doc diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/bulk/BulkOperationStrategy.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/bulk/BulkOperationStrategy.java new file mode 100644 index 0000000..7c9adc8 --- /dev/null +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/bulk/BulkOperationStrategy.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.plugins.document.bulk; + +import com.google.common.base.Predicate; + +/** + * BulkOperationStrategy allows to find out whether the update for a given + * document can be included in the bulk operation or should be applied + * separately. The implementing class can be stateful and may use the + * information about updated which failed or succeeded in the past. + */ +public interface BulkOperationStrategy extends Predicate { + + /** + * Check if the update for the given id can be applied in a bulk operation. + * + * @param id Document id + * @return {@code true} if the modification can be included in the bulk update + */ + boolean apply(String id); + + /** + * Inform about the conflicting update. + * @param id Document id + */ + void updateConflicted(String id); + + /** + * Inform about the succeeded update. + * @param id Document id + */ + void updateApplied(String id); + +} diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/bulk/DocumentUpdateHistory.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/bulk/DocumentUpdateHistory.java new file mode 100644 index 0000000..e350476 --- /dev/null +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/bulk/DocumentUpdateHistory.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.plugins.document.bulk; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.jackrabbit.oak.stats.Clock; + +/** + * The class contains a history of successful and failed bulk updates for a + * single document. + */ +public class DocumentUpdateHistory { + + private final List successes = new ArrayList(); + + private final List failures = new ArrayList(); + + private final long ttlMillis; + + private final Clock clock; + + private volatile UpdateStats stats = new UpdateStats(0, 0); + + private volatile long lastUpdate; + + private volatile long earliestSuccess = Long.MAX_VALUE; + + private volatile long earliestFailure = Long.MAX_VALUE; + + /** + * Create the new DocumentUpdateHistory + * + * @param ttlMillis how long should entries exists in the history + * @param clock + */ + public DocumentUpdateHistory(long ttlMillis, Clock clock) { + this.clock = clock; + this.ttlMillis = ttlMillis; + } + + /** + * Add new entry to the document history + * + * @param success {@code true} for a successful operation, {@code false} for failure + */ + public void add(boolean success) { + long time = clock.getTime(); + + if (success) { + synchronized (successes) { + successes.add(time); + if (earliestSuccess > time) { + earliestSuccess = time; + } + } + } else { + synchronized (failures) { + failures.add(time); + if (earliestFailure > time) { + earliestFailure = time; + } + } + } + clean(time); + + if (lastUpdate < time) { + lastUpdate = time; + } + + stats = null; + } + + private void clean(long timeNow) { + long deadline = timeNow - ttlMillis; + boolean modified = false; + if (deadline > earliestSuccess) { + synchronized (successes) { + earliestSuccess = removeSmallerThan(successes, deadline); + modified = true; + } + } + if (deadline > earliestFailure) { + synchronized (failures) { + earliestFailure = removeSmallerThan(failures, deadline); + modified = true; + } + } + if (modified) { + stats = null; + } + } + + /** + * Remove entries smaller than the {@code minValue} + * + * @param list to process + * @param minValue the limiting value + * + * @return the smallest value not removed from the queue or + * {@link Long#MAX_VALUE} if the resulting list is empty + */ + private static long removeSmallerThan(List list, long minValue) { + Iterator it = list.iterator(); + long smallestValue = Long.MAX_VALUE; + while (it.hasNext()) { + long v = it.next(); + if (v < minValue) { + it.remove(); + } else if (v < smallestValue) { + smallestValue = v; + } + } + return smallestValue; + } + + /** + * Return the update stats. + * + * @return the number of successful and failed updates + */ + public UpdateStats getCounts() { + clean(clock.getTime()); + + UpdateStats currentStats = stats; + if (currentStats == null) { + int successCount = 0; + int failureCount = 0; + synchronized (successes) { + successCount = successes.size(); + } + synchronized (failures) { + failureCount = failures.size(); + } + currentStats = new UpdateStats(successCount, failureCount); + } + stats = currentStats; + return currentStats; + } + + /** + * Check whether the history contains only out-dated entries + * + * @return {@code true} if the history no longer contains entries younger than {@link #ttlMillis} + */ + public boolean isOutdated() { + return lastUpdate < clock.getTime() - ttlMillis; + } + + public class UpdateStats { + + private final int successCount; + + private final int failureCount; + + public UpdateStats(int successCount, int failureCount) { + this.successCount = successCount; + this.failureCount = failureCount; + } + + /** + * @return the number of successful updates + */ + public int getSuccessCount() { + return successCount; + } + + /** + * @return the number of failed updates + */ + public int getFailureCount() { + return failureCount; + } + + /** + * @return the total number of updates + */ + public int getTotalCounts() { + return successCount + failureCount; + } + + /** + * @return the number between 0 and 1 describing the ratio of successful + * updates or {@link Double#NaN} if there were no updates + */ + public double getSuccessRatio() { + long total = getTotalCounts(); + if (total == 0) { + return Double.NaN; + } else { + return ((double) successCount) / total; + } + } + + @Override + public String toString() { + return String.format("UpdateStats[success=%d,failure=%d,total=%d,ratio=%.2f%%]", successCount, failureCount, + getTotalCounts(), getSuccessRatio() * 100); + } + } +} diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/bulk/DummyBulkOperationStrategy.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/bulk/DummyBulkOperationStrategy.java new file mode 100644 index 0000000..6737c61 --- /dev/null +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/bulk/DummyBulkOperationStrategy.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.plugins.document.bulk; + +/** + * Naive implementation of the BulkOperationStrategy. It allows all updates to + * be included in the bulk operation. + */ +public class DummyBulkOperationStrategy implements BulkOperationStrategy { + + @Override + public boolean apply(String id) { + return true; + } + + @Override + public void updateConflicted(String id) { + } + + @Override + public void updateApplied(String id) { + } + +} diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/bulk/HistoricBulkOperationStrategy.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/bulk/HistoricBulkOperationStrategy.java new file mode 100644 index 0000000..2aef189 --- /dev/null +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/bulk/HistoricBulkOperationStrategy.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.plugins.document.bulk; + +import static java.util.concurrent.TimeUnit.MINUTES; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.jackrabbit.oak.plugins.document.bulk.DocumentUpdateHistory.UpdateStats; +import org.apache.jackrabbit.oak.stats.Clock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This implementation of the {@link BulkOperationStrategy} uses historic + * results of the bulk updates to find the documents which conflicts too often + * to be included in the batch operations. + *

+ * The algorithm is as follows: if the document was included in at least + * {@link #minAttempts} bulk operations during the last {@link #ttlMillis} and + * only less than {@link #minSuccessRatio} succeeded, then it should be excluded + * from the bulk operations. + */ +public class HistoricBulkOperationStrategy implements BulkOperationStrategy { + + private static final Logger LOG = LoggerFactory.getLogger(HistoricBulkOperationStrategy.class); + + final ConcurrentMap histories = new ConcurrentHashMap(); + + private final Clock clock; + + /** + * How long should the strategy remember about bulk update results. + */ + private final long ttlMillis; + + /** + * How many update attempts are required to run the strategy. + */ + private final long minAttempts; + + /** + * What's the minimum success ratio to include the update in the bulk operation. + */ + private final double minSuccessRatio; + + public HistoricBulkOperationStrategy(Clock clock, long ttlMillis, long minAttempts, double minSuccessRatio) { + this.clock = clock; + this.ttlMillis = ttlMillis; + this.minAttempts = minAttempts; + this.minSuccessRatio = minSuccessRatio; + } + + public HistoricBulkOperationStrategy() { + this(Clock.SIMPLE); + } + + public HistoricBulkOperationStrategy(Clock clock) { + this(clock, MINUTES.toMillis(60), 20, 0.5d); + } + + @Override + public boolean apply(String id) { + DocumentUpdateHistory history = histories.get(id); + if (history == null) { + LOG.debug("{} will be included in the batch update. No stats gathered", id); + return true; + } + + if (history.isOutdated()) { + removeOutdated(id, history); + } + + UpdateStats counts = history.getCounts(); + if (counts.getTotalCounts() < minAttempts) { + LOG.debug("{} will be included in the batch update. Not enough stats gathered: {}", id, counts); + return true; + } + if (counts.getSuccessRatio() >= minSuccessRatio) { + LOG.debug("{} will be included in the batch update. Stats: {}", id, counts); + return true; + } else { + LOG.debug("{} won't be included in the batch update. Stats: {}", id, counts); + return false; + } + } + + @Override + public void updateConflicted(String id) { + update(id, false); + } + + @Override + public void updateApplied(String id) { + update(id, true); + } + + private void update(String id, boolean success) { + DocumentUpdateHistory history = histories.get(id); + if (history == null) { + synchronized (histories) { // synchronized to avoid conflict with #removeOutdated + history = histories.get(id); + if (history == null) { + DocumentUpdateHistory newHistory = new DocumentUpdateHistory(ttlMillis, clock); + newHistory.add(success); + histories.put(id, newHistory); + return; + } + } + } + history.add(success); + } + + private void removeOutdated(String id, DocumentUpdateHistory history) { + synchronized (histories) { + if (history.isOutdated()) { + histories.remove(id); + } + } + } +} diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java index 9ca54e5..c5a1d9a 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java @@ -41,8 +41,10 @@ import javax.annotation.CheckForNull; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import com.google.common.base.Predicates; import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.util.concurrent.UncheckedExecutionException; @@ -67,6 +69,8 @@ import org.apache.jackrabbit.oak.plugins.document.UpdateOp; import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Condition; import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Key; import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Operation; +import org.apache.jackrabbit.oak.plugins.document.bulk.BulkOperationStrategy; +import org.apache.jackrabbit.oak.plugins.document.bulk.HistoricBulkOperationStrategy; import org.apache.jackrabbit.oak.plugins.document.UpdateUtils; import org.apache.jackrabbit.oak.plugins.document.cache.CacheInvalidationStats; import org.apache.jackrabbit.oak.plugins.document.cache.NodeDocumentCache; @@ -137,6 +141,8 @@ public class MongoDocumentStore implements DocumentStore { private final TreeNodeDocumentLocks nodeLocks; + private final BulkOperationStrategy bulkOperationStrategy; + private Clock clock = Clock.SIMPLE; private final long maxReplicationLagMillis; @@ -243,6 +249,8 @@ public class MongoDocumentStore implements DocumentStore { this.nodeLocks = new TreeNodeDocumentLocks(); this.nodesCache = builder.buildNodeDocumentCache(this, nodeLocks); + this.bulkOperationStrategy = new HistoricBulkOperationStrategy(clock); + LOG.info("Configuration maxReplicationLagMillis {}, " + "maxDeltaForModTimeIdxSecs {}, disableIndexHint {}, {}", maxReplicationLagMillis, maxDeltaForModTimeIdxSecs, @@ -883,7 +891,14 @@ public class MongoDocumentStore implements DocumentStore { break; } for (List partition : Lists.partition(Lists.newArrayList(operationsToCover.values()), bulkSize)) { - Map successfulUpdates = bulkUpdate(collection, partition, oldDocs); + Iterable filteredUpdates; + if (collection == Collection.NODES) { + filteredUpdates = Iterables.filter(partition, Predicates.compose(bulkOperationStrategy, UpdateUtils.GET_ID)); + } else { + filteredUpdates = partition; + } + + Map successfulUpdates = bulkUpdate(collection, filteredUpdates, oldDocs); results.putAll(successfulUpdates); operationsToCover.values().removeAll(successfulUpdates.keySet()); } @@ -925,7 +940,7 @@ public class MongoDocumentStore implements DocumentStore { } private Map bulkUpdate(Collection collection, - List updateOperations, + Iterable updateOperations, Map oldDocs) { Map bulkOperations = createMap(updateOperations); Set lackingDocs = difference(bulkOperations.keySet(), oldDocs.keySet()); @@ -940,6 +955,14 @@ public class MongoDocumentStore implements DocumentStore { BulkUpdateResult bulkResult = sendBulkUpdate(collection, bulkOperations.values(), oldDocs); if (collection == Collection.NODES) { + for (String id : bulkOperations.keySet()) { + if (bulkResult.failedUpdates.contains(id)) { + bulkOperationStrategy.updateConflicted(id); + } else { + bulkOperationStrategy.updateApplied(id); + } + } + for (UpdateOp op : filterKeys(bulkOperations, in(bulkResult.upserts)).values()) { NodeDocument doc = Collection.NODES.newDocument(this); UpdateUtils.applyChanges(doc, op); @@ -975,7 +998,7 @@ public class MongoDocumentStore implements DocumentStore { } } - private static Map createMap(List updateOps) { + private static Map createMap(Iterable updateOps) { return Maps.uniqueIndex(updateOps, new Function() { @Override public String apply(UpdateOp input) { diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java index cdaba3d..10e0db1 100755 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java @@ -17,8 +17,11 @@ package org.apache.jackrabbit.oak.plugins.document.rdb; import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Predicates.compose; +import static com.google.common.collect.Iterables.filter; import static com.google.common.collect.Lists.newArrayList; import static com.google.common.collect.Lists.partition; +import static org.apache.jackrabbit.oak.plugins.document.UpdateUtils.GET_ID; import static org.apache.jackrabbit.oak.plugins.document.UpdateUtils.checkConditions; import static org.apache.jackrabbit.oak.plugins.document.rdb.RDBJDBCTools.closeResultSet; import static org.apache.jackrabbit.oak.plugins.document.rdb.RDBJDBCTools.closeStatement; @@ -75,6 +78,8 @@ import org.apache.jackrabbit.oak.plugins.document.UpdateOp; import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Condition; import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Key; import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Operation; +import org.apache.jackrabbit.oak.plugins.document.bulk.BulkOperationStrategy; +import org.apache.jackrabbit.oak.plugins.document.bulk.HistoricBulkOperationStrategy; import org.apache.jackrabbit.oak.plugins.document.UpdateUtils; import org.apache.jackrabbit.oak.plugins.document.cache.CacheInvalidationStats; import org.apache.jackrabbit.oak.plugins.document.cache.NodeDocumentCache; @@ -347,7 +352,13 @@ public class RDBDocumentStore implements DocumentStore { } for (List partition : partition(newArrayList(operationsToCover.values()), CHUNKSIZE)) { - Map successfulUpdates = bulkUpdate(collection, partition, oldDocs, upsert); + Iterable filteredUpdates; + if (collection == Collection.NODES) { + filteredUpdates = filter(partition, compose(bulkOperationStrategy, GET_ID)); + } else { + filteredUpdates = partition; + } + Map successfulUpdates = bulkUpdate(collection, filteredUpdates, oldDocs, upsert); results.putAll(successfulUpdates); operationsToCover.values().removeAll(successfulUpdates.keySet()); } @@ -422,7 +433,7 @@ public class RDBDocumentStore implements DocumentStore { return result; } - private Map bulkUpdate(Collection collection, List updates, Map oldDocs, boolean upsert) { + private Map bulkUpdate(Collection collection, Iterable updates, Map oldDocs, boolean upsert) { Set missingDocs = new HashSet(); for (UpdateOp op : updates) { if (!oldDocs.containsKey(op.getId())) { @@ -436,7 +447,7 @@ public class RDBDocumentStore implements DocumentStore { } } - List docsToUpdate = new ArrayList(updates.size()); + List docsToUpdate = new ArrayList(); Set keysToUpdate = new HashSet(); for (UpdateOp update : updates) { String id = update.getId(); @@ -468,6 +479,9 @@ public class RDBDocumentStore implements DocumentStore { } else { nodesCache.putIfAbsent((NodeDocument) doc); } + bulkOperationStrategy.updateApplied(id); + } else { + bulkOperationStrategy.updateConflicted(id); } } } @@ -753,6 +767,7 @@ public class RDBDocumentStore implements DocumentStore { this.locks = new StripedNodeDocumentLocks(); this.nodesCache = builder.buildNodeDocumentCache(this, locks); + this.bulkOperationStrategy = new HistoricBulkOperationStrategy(); Connection con = this.ch.getRWConnection(); @@ -1834,6 +1849,8 @@ public class RDBDocumentStore implements DocumentStore { private NodeDocumentLocks locks; + private BulkOperationStrategy bulkOperationStrategy; + @CheckForNull private static NodeDocument unwrap(@Nonnull NodeDocument doc) { return doc == NodeDocument.NULL ? null : doc; diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/bulk/DocumentUpdateHistoryTest.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/bulk/DocumentUpdateHistoryTest.java new file mode 100644 index 0000000..48677a9 --- /dev/null +++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/bulk/DocumentUpdateHistoryTest.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.plugins.document.bulk; + +import static java.util.concurrent.TimeUnit.MINUTES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.jackrabbit.oak.plugins.document.bulk.DocumentUpdateHistory.UpdateStats; +import org.apache.jackrabbit.oak.stats.Clock; +import org.junit.Test; + +public class DocumentUpdateHistoryTest { + + @Test + public void testCounts() { + DocumentUpdateHistory history = new DocumentUpdateHistory(MINUTES.toMillis(1), Clock.SIMPLE); + + int success = 0; + Random random = new Random(); + for (int i = 0; i < 100; i++) { + if (random.nextBoolean()) { + success++; + history.add(true); + } else { + history.add(false); + } + } + + UpdateStats stats = history.getCounts(); + assertEquals(success, stats.getSuccessCount()); + assertEquals(100 - success, stats.getFailureCount()); + assertEquals(100, stats.getTotalCounts()); + assertEquals(((double) success) / 100, stats.getSuccessRatio(), 0.001); + } + + @Test + public void testConcurrentUpdates() throws InterruptedException { + final DocumentUpdateHistory history = new DocumentUpdateHistory(MINUTES.toMillis(1), Clock.SIMPLE); + + final AtomicInteger success = new AtomicInteger(); + List threads = new ArrayList(); + for (int i = 0; i < 10; i++) { + Thread t = new Thread(new Runnable() { + @Override + public void run() { + Random random = new Random(); + for (int i = 0; i < 100; i++) { + if (random.nextBoolean()) { + success.incrementAndGet(); + history.add(true); + } else { + history.add(false); + } + } + } + }); + threads.add(t); + t.start(); + } + + for (Thread t : threads) { + t.join(); + } + + UpdateStats stats = history.getCounts(); + assertEquals(success.get(), stats.getSuccessCount()); + assertEquals(1000 - success.get(), stats.getFailureCount()); + assertEquals(1000, stats.getTotalCounts()); + assertEquals(((double) success.get()) / 1000, stats.getSuccessRatio(), 0.001); + } + + @Test + public void testTtl() throws InterruptedException { + Clock.Virtual clock = new Clock.Virtual(); + DocumentUpdateHistory history = new DocumentUpdateHistory(MINUTES.toMillis(60), clock); + + for (int i = 0; i < 5; i++) { + history.add(true); + history.add(false); + } + + clock.waitUntil(clock.getTime() + MINUTES.toMillis(30)); + + for (int i = 0; i < 3; i++) { + history.add(true); + history.add(false); + } + + UpdateStats stats = history.getCounts(); + assertEquals(8, stats.getSuccessCount()); + assertEquals(8, stats.getFailureCount()); + + clock.waitUntil(clock.getTime() + MINUTES.toMillis(30)); + + stats = history.getCounts(); + assertEquals(3, stats.getSuccessCount()); + assertEquals(3, stats.getFailureCount()); + + clock.waitUntil(clock.getTime() + MINUTES.toMillis(30)); + + stats = history.getCounts(); + assertEquals(0, stats.getSuccessCount()); + assertEquals(0, stats.getFailureCount()); + } + + @Test + public void testOutdated() throws InterruptedException { + Clock.Virtual clock = new Clock.Virtual(); + DocumentUpdateHistory history = new DocumentUpdateHistory(MINUTES.toMillis(60), clock); + + for (int i = 0; i < 5; i++) { + history.add(true); + history.add(false); + } + + clock.waitUntil(clock.getTime() + MINUTES.toMillis(30)); + + for (int i = 0; i < 3; i++) { + history.add(true); + history.add(false); + } + + assertFalse(history.isOutdated()); + + clock.waitUntil(clock.getTime() + MINUTES.toMillis(30)); + + assertFalse(history.isOutdated()); + + clock.waitUntil(clock.getTime() + MINUTES.toMillis(30)); + + assertTrue(history.isOutdated()); + } + +} diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/bulk/HistoricBulkOperationStrategyTest.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/bulk/HistoricBulkOperationStrategyTest.java new file mode 100644 index 0000000..749a7bc --- /dev/null +++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/bulk/HistoricBulkOperationStrategyTest.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.plugins.document.bulk; + +import static java.util.concurrent.TimeUnit.MINUTES; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.jackrabbit.oak.plugins.document.bulk.DocumentUpdateHistory.UpdateStats; +import org.apache.jackrabbit.oak.stats.Clock; +import org.junit.Test; + +public class HistoricBulkOperationStrategyTest { + + @Test + public void testBasicCase() { + Clock.Virtual clock = new Clock.Virtual(); + BulkOperationStrategy strategy = new HistoricBulkOperationStrategy(clock, MINUTES.toMillis(1), 10, 0.5); + update("xyz", strategy, true, 6); + update("xyz", strategy, false, 4); + assertTrue(strategy.apply("xyz")); + + update("xyz", strategy, false, 3); + assertFalse(strategy.apply("xyz")); + } + + @Test + public void testMinimumAttempts() { + Clock.Virtual clock = new Clock.Virtual(); + BulkOperationStrategy strategy = new HistoricBulkOperationStrategy(clock, MINUTES.toMillis(1), 10, 0.5); + update("xyz", strategy, false, 9); + assertTrue(strategy.apply("xyz")); + + update("xyz", strategy, false, 1); + assertFalse(strategy.apply("xyz")); + } + + @Test + public void testTtl() { + Clock.Virtual clock = new Clock.Virtual(); + BulkOperationStrategy strategy = new HistoricBulkOperationStrategy(clock, SECONDS.toMillis(60), 10, 0.5); + update("xyz", strategy, false, 6); + clock.waitUntil(clock.getTime() + SECONDS.toMillis(30)); + update("xyz", strategy, false, 4); + + assertFalse(strategy.apply("xyz")); + + clock.waitUntil(clock.getTime() + SECONDS.toMillis(30)); + assertTrue(strategy.apply("xyz")); + } + + @Test + public void testRemoveOutdated() { + Clock.Virtual clock = new Clock.Virtual(); + HistoricBulkOperationStrategy strategy = new HistoricBulkOperationStrategy(clock, SECONDS.toMillis(60), 10, 0.5); + update("xyz", strategy, false, 10); + assertFalse(strategy.apply("xyz")); + + clock.waitUntil(clock.getTime() + SECONDS.toMillis(60)); + assertTrue(strategy.apply("xyz")); + assertTrue(strategy.histories.isEmpty()); + } + + @Test + public void testConcurrentUpdates() throws InterruptedException { + Clock.Virtual clock = new Clock.Virtual(); + final HistoricBulkOperationStrategy strategy = new HistoricBulkOperationStrategy(clock, SECONDS.toMillis(60), 10, 0.5); + + final Map successCounters = new HashMap(); + final Map updateCounters = new HashMap(); + for (int i = 0; i < 10 ; i++) { + successCounters.put("key" + i, new AtomicInteger()); + updateCounters.put("key" + i, new AtomicInteger()); + } + + List threads = new ArrayList(); + for (int i = 0; i < 10; i++) { + Thread t = new Thread(new Runnable() { + @Override + public void run() { + Random random = new Random(); + for (int i = 0; i < 1000; i++) { + String key = "key" + random.nextInt(10); + if (random.nextBoolean()) { + successCounters.get(key).incrementAndGet(); + strategy.updateApplied(key); + } else { + strategy.updateConflicted(key); + } + updateCounters.get(key).incrementAndGet(); + } + } + }); + threads.add(t); + t.start(); + } + + for (Thread t : threads) { + t.join(); + } + + for (int i = 0; i < 10; i++) { + String key = "key" + i; + DocumentUpdateHistory history = strategy.histories.get(key); + UpdateStats counts = history.getCounts(); + + long s = successCounters.get(key).get(); + long t = updateCounters.get(key).get(); + assertEquals(s, counts.getSuccessCount()); + assertEquals(t, counts.getTotalCounts()); + assertEquals(((double) s / t) >= 0.5, strategy.apply(key)); + } + } + + private void update(String id, BulkOperationStrategy strategy, boolean success, int times) { + for (int i = 0; i < times; i++) { + if (success) { + strategy.updateApplied(id); + } else { + strategy.updateConflicted(id); + } + } + } +}