diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateClusterTest.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateClusterTest.java new file mode 100644 index 0000000..fc3226b --- /dev/null +++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateClusterTest.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document; + +import static java.util.Collections.shuffle; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.junit.Test; + +public class BulkCreateOrUpdateClusterTest extends AbstractMultiDocumentStoreTest { + + public BulkCreateOrUpdateClusterTest(DocumentStoreFixture dsf) { + super(dsf); + } + + /** + * Run multiple batch updates concurrently. Each thread modifies only its own documents. + */ + @Test + public void testConcurrentNoConflict() throws InterruptedException { + int amountPerThread = 100; + int threadCount = 10; + int amount = amountPerThread * threadCount; + + List updates = new ArrayList(amount); + // create even items + for (int i = 0; i < amount; i += 2) { + String id = this.getClass().getName() + ".testConcurrentNoConflict" + i; + UpdateOp up = new UpdateOp(id, true); + up.set("_id", id); + up.set("prop", 100); + updates.add(up); + } + ds1.create(Collection.NODES, updates); + + List threads = new ArrayList(); + final Map oldDocs = new ConcurrentHashMap(); + for (int i = 0; i < threadCount; i++) { + final DocumentStore selectedDs = i % 2 == 0 ? this.ds1 : this.ds2; + final List threadUpdates = new ArrayList(amountPerThread); + for (int j = 0; j < amountPerThread; j++) { + String id = this.getClass().getName() + ".testConcurrentNoConflict" + (j + i * amountPerThread); + UpdateOp up = new UpdateOp(id, true); + up.set("_id", id); + up.set("prop", 200 + i + j); + threadUpdates.add(up); + removeMe.add(id); + } + shuffle(threadUpdates); + threads.add(new Thread() { + public void run() { + for (NodeDocument d : selectedDs.createOrUpdate(Collection.NODES, threadUpdates)) { + if (d == null) { + continue; + } + oldDocs.put(d.getId(), d); + } + } + }); + } + + for (Thread t : threads) { + t.start(); + } + for (Thread t : threads) { + t.join(); + if (t.isAlive()) { + fail("Thread hasn't finished in 10s"); + } + } + + for (int i = 0; i < amount; i++) { + String id = this.getClass().getName() + ".testConcurrentNoConflict" + i; + + NodeDocument oldDoc = oldDocs.get(id); + NodeDocument newDoc; //avoid cache issues + if (i % 2 == 1) { + assertNull("The returned value should be null for created doc", oldDoc); + newDoc = ds1.find(Collection.NODES, id); + } else { + assertNotNull("The returned doc shouldn't be null for updated doc", oldDoc); + assertEquals("The old value is not correct", 100l, oldDoc.get("prop")); + newDoc = ds2.find(Collection.NODES, id); + } + assertNotEquals("The document hasn't been updated", 100l, newDoc.get("prop")); + } + } + + /** + * Run multiple batch updates concurrently. Each thread modifies the same set of documents. + */ + @Test + public void testConcurrentWithConflict() throws InterruptedException { + int threadCount = 10; + int amount = 500; + + List updates = new ArrayList(amount); + // create even items + for (int i = 0; i < amount; i += 2) { + String id = this.getClass().getName() + ".testConcurrentNoConflict" + i; + UpdateOp up = new UpdateOp(id, true); + up.set("_id", id); + up.set("prop", 100); + updates.add(up); + removeMe.add(id); + } + ds1.create(Collection.NODES, updates); + + List threads = new ArrayList(); + for (int i = 0; i < threadCount; i++) { + final DocumentStore selectedDs = i % 2 == 0 ? this.ds1 : this.ds2; + final List threadUpdates = new ArrayList(amount); + for (int j = 0; j < amount; j++) { + String id = this.getClass().getName() + ".testConcurrentWithConflict" + j; + UpdateOp up = new UpdateOp(id, true); + up.set("_id", id); + up.set("prop", 200 + i * amount + j); + threadUpdates.add(up); + removeMe.add(id); + } + shuffle(threadUpdates); + threads.add(new Thread() { + public void run() { + selectedDs.createOrUpdate(Collection.NODES, threadUpdates); + } + }); + } + + for (Thread t : threads) { + t.start(); + } + for (Thread t : threads) { + t.join(10000); + if (t.isAlive()) { + fail("Thread hasn't finished in 10s"); + } + } + + for (int i = 0; i < amount; i++) { + String id = this.getClass().getName() + ".testConcurrentWithConflict" + i; + + NodeDocument newDoc = ds1.find(Collection.NODES, id); + assertNotNull("The document hasn't been inserted", newDoc); + assertNotEquals("The document hasn't been updated", 100l, newDoc.get("prop")); + + newDoc = ds2.find(Collection.NODES, id); + assertNotNull("The document hasn't been inserted", newDoc); + assertNotEquals("The document hasn't been updated", 100l, newDoc.get("prop")); + } + } + +} diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateTest.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateTest.java index 7f26d54..1f07786 100644 --- a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateTest.java +++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateTest.java @@ -16,13 +16,18 @@ */ package org.apache.jackrabbit.oak.plugins.document; +import static java.util.Collections.shuffle; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import javax.sql.DataSource; @@ -156,6 +161,134 @@ public class BulkCreateOrUpdateTest extends AbstractDocumentStoreTest { } /** + * Run multiple batch updates concurrently. Each thread modifies only its own documents. + */ + @Test + public void testConcurrentNoConflict() throws InterruptedException { + int amountPerThread = 100; + int threadCount = 10; + int amount = amountPerThread * threadCount; + + List updates = new ArrayList(amount); + // create even items + for (int i = 0; i < amount; i += 2) { + String id = this.getClass().getName() + ".testConcurrentNoConflict" + i; + UpdateOp up = new UpdateOp(id, true); + up.set("_id", id); + up.set("prop", 100); + updates.add(up); + } + ds.create(Collection.NODES, updates); + + List threads = new ArrayList(); + final Map oldDocs = new ConcurrentHashMap(); + for (int i = 0; i < threadCount; i++) { + final List threadUpdates = new ArrayList(amountPerThread); + for (int j = 0; j < amountPerThread; j++) { + String id = this.getClass().getName() + ".testConcurrentNoConflict" + (j + i * amountPerThread); + UpdateOp up = new UpdateOp(id, true); + up.set("_id", id); + up.set("prop", 200 + i + j); + threadUpdates.add(up); + removeMe.add(id); + } + shuffle(threadUpdates); + threads.add(new Thread() { + public void run() { + for (NodeDocument d : ds.createOrUpdate(Collection.NODES, threadUpdates)) { + if (d == null) { + continue; + } + oldDocs.put(d.getId(), d); + } + } + }); + } + + for (Thread t : threads) { + t.start(); + } + for (Thread t : threads) { + t.join(); + if (t.isAlive()) { + fail("Thread hasn't finished in 10s"); + } + } + + for (int i = 0; i < amount; i++) { + String id = this.getClass().getName() + ".testConcurrentNoConflict" + i; + + NodeDocument oldDoc = oldDocs.get(id); + NodeDocument newDoc = ds.find(Collection.NODES, id); + if (i % 2 == 1) { + assertNull("The returned value should be null for created doc", oldDoc); + } else { + assertNotNull("The returned doc shouldn't be null for updated doc", oldDoc); + assertEquals("The old value is not correct", 100l, oldDoc.get("prop")); + } + assertNotEquals("The document hasn't been updated", 100l, newDoc.get("prop")); + } + } + + /** + * Run multiple batch updates concurrently. Each thread modifies the same set of documents. + */ + @Test + public void testConcurrentWithConflict() throws InterruptedException { + int threadCount = 10; + int amount = 500; + + List updates = new ArrayList(amount); + // create even items + for (int i = 0; i < amount; i += 2) { + String id = this.getClass().getName() + ".testConcurrentNoConflict" + i; + UpdateOp up = new UpdateOp(id, true); + up.set("_id", id); + up.set("prop", 100); + updates.add(up); + removeMe.add(id); + } + ds.create(Collection.NODES, updates); + + List threads = new ArrayList(); + for (int i = 0; i < threadCount; i++) { + final List threadUpdates = new ArrayList(amount); + for (int j = 0; j < amount; j++) { + String id = this.getClass().getName() + ".testConcurrentWithConflict" + j; + UpdateOp up = new UpdateOp(id, true); + up.set("_id", id); + up.set("prop", 200 + i * amount + j); + threadUpdates.add(up); + removeMe.add(id); + } + shuffle(threadUpdates); + threads.add(new Thread() { + public void run() { + ds.createOrUpdate(Collection.NODES, threadUpdates); + } + }); + } + + for (Thread t : threads) { + t.start(); + } + for (Thread t : threads) { + t.join(10000); + if (t.isAlive()) { + fail("Thread hasn't finished in 10s"); + } + } + + for (int i = 0; i < amount; i++) { + String id = this.getClass().getName() + ".testConcurrentWithConflict" + i; + + NodeDocument newDoc = ds.find(Collection.NODES, id); + assertNotNull("The document hasn't been inserted", newDoc); + assertNotEquals("The document hasn't been updated", 100l, newDoc.get("prop")); + } + } + + /** * This method adds a few updateOperations modifying the same document. */ @Test