diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java index 9072368..eaafdd7 100755 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java @@ -297,6 +297,14 @@ public class RDBDocumentStore implements DocumentStore { @Override public List createOrUpdate(Collection collection, List updateOps) { + if (!BATCHUPDATES) { + List results = new ArrayList(updateOps.size()); + for (UpdateOp update : updateOps) { + results.add(createOrUpdate(collection, update)); + } + return results; + } + Map results = new LinkedHashMap(); Map operationsToCover = new LinkedHashMap(); Set duplicates = new HashSet(); @@ -437,8 +445,8 @@ public class RDBDocumentStore implements DocumentStore { RDBTableMetaData tmd = getTable(collection); try { connection = this.ch.getRWConnection(); + connection.setAutoCommit(true); Set successfulUpdates = db.update(connection, tmd, docsToUpdate, upsert); - connection.commit(); Set failedUpdates = Sets.difference(keysToUpdate, successfulUpdates); oldDocs.keySet().removeAll(failedUpdates); @@ -1178,7 +1186,9 @@ public class RDBDocumentStore implements DocumentStore { UpdateUtils.applyChanges(doc, update); try { Stopwatch watch = startWatch(); - insertDocuments(collection, Collections.singletonList(doc)); + if (!insertDocuments(collection, Collections.singletonList(doc))) { + throw new DocumentStoreException("Can't insert the document: " + doc.getId()); + } if (collection == Collection.NODES) { nodesCache.putIfAbsent((NodeDocument) doc); } @@ -1762,6 +1772,9 @@ public class RDBDocumentStore implements DocumentStore { // Number of elapsed ms in a query above which a diagnostic warning is generated private static final int QUERYTIMELIMIT = Integer.getInteger( "org.apache.jackrabbit.oak.plugins.document.rdb.RDBDocumentStore.QUERYTIMELIMIT", 10000); + // Whether to use JDBC batch commands for the createOrUpdate (default: true). + private static final boolean BATCHUPDATES = Boolean.parseBoolean(System + .getProperty("org.apache.jackrabbit.oak.plugins.document.rdb.RDBDocumentStore.BATCHUPDATES", "true")); public static byte[] asBytes(String data) { byte[] bytes; diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStoreJDBC.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStoreJDBC.java index e78af4c..d4b4675 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStoreJDBC.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStoreJDBC.java @@ -16,12 +16,15 @@ */ package org.apache.jackrabbit.oak.plugins.document.rdb; +import static com.google.common.collect.Iterables.transform; +import static com.google.common.collect.Sets.newHashSet; import static org.apache.jackrabbit.oak.plugins.document.rdb.RDBDocumentStore.CHAR2OCTETRATIO; import static org.apache.jackrabbit.oak.plugins.document.rdb.RDBDocumentStore.asBytes; import static org.apache.jackrabbit.oak.plugins.document.rdb.RDBJDBCTools.closeResultSet; import static org.apache.jackrabbit.oak.plugins.document.rdb.RDBJDBCTools.closeStatement; import java.io.UnsupportedEncodingException; +import java.sql.BatchUpdateException; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -31,6 +34,7 @@ import java.sql.Types; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -266,8 +270,10 @@ public class RDBDocumentStoreJDBC { "insert into " + tmd.getName() + "(ID, MODIFIED, HASBINARY, DELETEDONCE, MODCOUNT, CMODCOUNT, DSIZE, DATA, BDATA) " + "values (?, ?, ?, ?, ?, ?, ?, ?, ?)"); + List sortedDocs = sortDocuments(documents); + int[] results; try { - for (T document : documents) { + for (T document : sortedDocs) { String data = this.ser.asString(document); String id = document.getId(); Number hasBinary = (Number) document.get(NodeDocument.HAS_BINARY_FLAG); @@ -293,21 +299,23 @@ public class RDBDocumentStoreJDBC { } stmt.addBatch(); } - int[] results = stmt.executeBatch(); - - Set succesfullyInserted = new HashSet(); - for (int i = 0; i < documents.size(); i++) { - int result = results[i]; - if (result != 1 && result != Statement.SUCCESS_NO_INFO) { - LOG.error("DB insert failed for {}: {}", tmd.getName(), documents.get(i).getId()); - } else { - succesfullyInserted.add(documents.get(i).getId()); - } - } - return succesfullyInserted; + results = stmt.executeBatch(); + } catch (BatchUpdateException e) { + LOG.debug("Some of the batch updates failed", e); + results = e.getUpdateCounts(); } finally { stmt.close(); } + Set succesfullyInserted = new HashSet(); + for (int i = 0; i < results.length; i++) { + int result = results[i]; + if (result != 1 && result != Statement.SUCCESS_NO_INFO) { + LOG.error("DB insert failed for {}: {}", tmd.getName(), sortedDocs.get(i).getId()); + } else { + succesfullyInserted.add(sortedDocs.get(i).getId()); + } + } + return succesfullyInserted; } /** @@ -318,6 +326,9 @@ public class RDBDocumentStoreJDBC { *

* If the {@code upsert} parameter is set to true, the method will also try to insert new documents, those * which modcount equals to 1. + *

+ * The order of applying updates will be different than order of the passed list, so there shouldn't be two + * updates related to the same document. An {@link IllegalArgumentException} will be thrown if there are. * * @param connection JDBC connection * @param tmd Table metadata @@ -328,13 +339,16 @@ public class RDBDocumentStoreJDBC { */ public Set update(Connection connection, RDBTableMetaData tmd, List documents, boolean upsert) throws SQLException { + assertNoDuplicatedIds(documents); + Set successfulUpdates = new HashSet(); + List updatedKeys = new ArrayList(); + int[] batchResults; PreparedStatement stmt = connection.prepareStatement("update " + tmd.getName() + " set MODIFIED = ?, HASBINARY = ?, DELETEDONCE = ?, MODCOUNT = ?, CMODCOUNT = ?, DSIZE = ?, DATA = ?, BDATA = ? where ID = ? and MODCOUNT = ?"); try { - List updatedKeys = new ArrayList(); - for (T document : documents) { + for (T document : sortDocuments(documents)) { Long modcount = (Long) document.get(MODCOUNT); if (modcount == 1) { continue; // This is a new document. We'll deal with the inserts later. @@ -368,19 +382,21 @@ public class RDBDocumentStoreJDBC { stmt.addBatch(); updatedKeys.add(document.getId()); } - - int[] batchResults = stmt.executeBatch(); - - for (int i = 0; i < batchResults.length; i++) { - int result = batchResults[i]; - if (result == 1 || result == Statement.SUCCESS_NO_INFO) { - successfulUpdates.add(updatedKeys.get(i)); - } - } + batchResults = stmt.executeBatch(); + } catch (BatchUpdateException e) { + LOG.debug("Some of the batch updates failed", e); + batchResults = e.getUpdateCounts(); } finally { stmt.close(); } + for (int i = 0; i < batchResults.length; i++) { + int result = batchResults[i]; + if (result == 1 || result == Statement.SUCCESS_NO_INFO) { + successfulUpdates.add(updatedKeys.get(i)); + } + } + if (upsert) { List remainingDocuments = new ArrayList(documents.size() - successfulUpdates.size()); for (T doc : documents) { @@ -430,6 +446,12 @@ public class RDBDocumentStoreJDBC { return successfulUpdates; } + private static void assertNoDuplicatedIds(List documents) { + if (newHashSet(transform(documents, idExtractor)).size() < documents.size()) { + throw new IllegalArgumentException("There are duplicated ids in the document list"); + } + } + private final static Map INDEXED_PROP_MAPPING; static { Map tmp = new HashMap(); @@ -753,6 +775,17 @@ public class RDBDocumentStoreJDBC { } } + private static List sortDocuments(Collection documents) { + List result = new ArrayList(documents); + Collections.sort(result, new Comparator() { + @Override + public int compare(T o1, T o2) { + return o1.getId().compareTo(o2.getId()); + } + }); + return result; + } + private static final Function idExtractor = new Function() { @Override public String apply(Document input) { diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateClusterTest.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateClusterTest.java new file mode 100644 index 0000000..fc3226b --- /dev/null +++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateClusterTest.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document; + +import static java.util.Collections.shuffle; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.junit.Test; + +public class BulkCreateOrUpdateClusterTest extends AbstractMultiDocumentStoreTest { + + public BulkCreateOrUpdateClusterTest(DocumentStoreFixture dsf) { + super(dsf); + } + + /** + * Run multiple batch updates concurrently. Each thread modifies only its own documents. + */ + @Test + public void testConcurrentNoConflict() throws InterruptedException { + int amountPerThread = 100; + int threadCount = 10; + int amount = amountPerThread * threadCount; + + List updates = new ArrayList(amount); + // create even items + for (int i = 0; i < amount; i += 2) { + String id = this.getClass().getName() + ".testConcurrentNoConflict" + i; + UpdateOp up = new UpdateOp(id, true); + up.set("_id", id); + up.set("prop", 100); + updates.add(up); + } + ds1.create(Collection.NODES, updates); + + List threads = new ArrayList(); + final Map oldDocs = new ConcurrentHashMap(); + for (int i = 0; i < threadCount; i++) { + final DocumentStore selectedDs = i % 2 == 0 ? this.ds1 : this.ds2; + final List threadUpdates = new ArrayList(amountPerThread); + for (int j = 0; j < amountPerThread; j++) { + String id = this.getClass().getName() + ".testConcurrentNoConflict" + (j + i * amountPerThread); + UpdateOp up = new UpdateOp(id, true); + up.set("_id", id); + up.set("prop", 200 + i + j); + threadUpdates.add(up); + removeMe.add(id); + } + shuffle(threadUpdates); + threads.add(new Thread() { + public void run() { + for (NodeDocument d : selectedDs.createOrUpdate(Collection.NODES, threadUpdates)) { + if (d == null) { + continue; + } + oldDocs.put(d.getId(), d); + } + } + }); + } + + for (Thread t : threads) { + t.start(); + } + for (Thread t : threads) { + t.join(); + if (t.isAlive()) { + fail("Thread hasn't finished in 10s"); + } + } + + for (int i = 0; i < amount; i++) { + String id = this.getClass().getName() + ".testConcurrentNoConflict" + i; + + NodeDocument oldDoc = oldDocs.get(id); + NodeDocument newDoc; //avoid cache issues + if (i % 2 == 1) { + assertNull("The returned value should be null for created doc", oldDoc); + newDoc = ds1.find(Collection.NODES, id); + } else { + assertNotNull("The returned doc shouldn't be null for updated doc", oldDoc); + assertEquals("The old value is not correct", 100l, oldDoc.get("prop")); + newDoc = ds2.find(Collection.NODES, id); + } + assertNotEquals("The document hasn't been updated", 100l, newDoc.get("prop")); + } + } + + /** + * Run multiple batch updates concurrently. Each thread modifies the same set of documents. + */ + @Test + public void testConcurrentWithConflict() throws InterruptedException { + int threadCount = 10; + int amount = 500; + + List updates = new ArrayList(amount); + // create even items + for (int i = 0; i < amount; i += 2) { + String id = this.getClass().getName() + ".testConcurrentNoConflict" + i; + UpdateOp up = new UpdateOp(id, true); + up.set("_id", id); + up.set("prop", 100); + updates.add(up); + removeMe.add(id); + } + ds1.create(Collection.NODES, updates); + + List threads = new ArrayList(); + for (int i = 0; i < threadCount; i++) { + final DocumentStore selectedDs = i % 2 == 0 ? this.ds1 : this.ds2; + final List threadUpdates = new ArrayList(amount); + for (int j = 0; j < amount; j++) { + String id = this.getClass().getName() + ".testConcurrentWithConflict" + j; + UpdateOp up = new UpdateOp(id, true); + up.set("_id", id); + up.set("prop", 200 + i * amount + j); + threadUpdates.add(up); + removeMe.add(id); + } + shuffle(threadUpdates); + threads.add(new Thread() { + public void run() { + selectedDs.createOrUpdate(Collection.NODES, threadUpdates); + } + }); + } + + for (Thread t : threads) { + t.start(); + } + for (Thread t : threads) { + t.join(10000); + if (t.isAlive()) { + fail("Thread hasn't finished in 10s"); + } + } + + for (int i = 0; i < amount; i++) { + String id = this.getClass().getName() + ".testConcurrentWithConflict" + i; + + NodeDocument newDoc = ds1.find(Collection.NODES, id); + assertNotNull("The document hasn't been inserted", newDoc); + assertNotEquals("The document hasn't been updated", 100l, newDoc.get("prop")); + + newDoc = ds2.find(Collection.NODES, id); + assertNotNull("The document hasn't been inserted", newDoc); + assertNotEquals("The document hasn't been updated", 100l, newDoc.get("prop")); + } + } + +} diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateTest.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateTest.java index 7f26d54..1f07786 100644 --- a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateTest.java +++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateTest.java @@ -16,13 +16,18 @@ */ package org.apache.jackrabbit.oak.plugins.document; +import static java.util.Collections.shuffle; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import javax.sql.DataSource; @@ -156,6 +161,134 @@ public class BulkCreateOrUpdateTest extends AbstractDocumentStoreTest { } /** + * Run multiple batch updates concurrently. Each thread modifies only its own documents. + */ + @Test + public void testConcurrentNoConflict() throws InterruptedException { + int amountPerThread = 100; + int threadCount = 10; + int amount = amountPerThread * threadCount; + + List updates = new ArrayList(amount); + // create even items + for (int i = 0; i < amount; i += 2) { + String id = this.getClass().getName() + ".testConcurrentNoConflict" + i; + UpdateOp up = new UpdateOp(id, true); + up.set("_id", id); + up.set("prop", 100); + updates.add(up); + } + ds.create(Collection.NODES, updates); + + List threads = new ArrayList(); + final Map oldDocs = new ConcurrentHashMap(); + for (int i = 0; i < threadCount; i++) { + final List threadUpdates = new ArrayList(amountPerThread); + for (int j = 0; j < amountPerThread; j++) { + String id = this.getClass().getName() + ".testConcurrentNoConflict" + (j + i * amountPerThread); + UpdateOp up = new UpdateOp(id, true); + up.set("_id", id); + up.set("prop", 200 + i + j); + threadUpdates.add(up); + removeMe.add(id); + } + shuffle(threadUpdates); + threads.add(new Thread() { + public void run() { + for (NodeDocument d : ds.createOrUpdate(Collection.NODES, threadUpdates)) { + if (d == null) { + continue; + } + oldDocs.put(d.getId(), d); + } + } + }); + } + + for (Thread t : threads) { + t.start(); + } + for (Thread t : threads) { + t.join(); + if (t.isAlive()) { + fail("Thread hasn't finished in 10s"); + } + } + + for (int i = 0; i < amount; i++) { + String id = this.getClass().getName() + ".testConcurrentNoConflict" + i; + + NodeDocument oldDoc = oldDocs.get(id); + NodeDocument newDoc = ds.find(Collection.NODES, id); + if (i % 2 == 1) { + assertNull("The returned value should be null for created doc", oldDoc); + } else { + assertNotNull("The returned doc shouldn't be null for updated doc", oldDoc); + assertEquals("The old value is not correct", 100l, oldDoc.get("prop")); + } + assertNotEquals("The document hasn't been updated", 100l, newDoc.get("prop")); + } + } + + /** + * Run multiple batch updates concurrently. Each thread modifies the same set of documents. + */ + @Test + public void testConcurrentWithConflict() throws InterruptedException { + int threadCount = 10; + int amount = 500; + + List updates = new ArrayList(amount); + // create even items + for (int i = 0; i < amount; i += 2) { + String id = this.getClass().getName() + ".testConcurrentNoConflict" + i; + UpdateOp up = new UpdateOp(id, true); + up.set("_id", id); + up.set("prop", 100); + updates.add(up); + removeMe.add(id); + } + ds.create(Collection.NODES, updates); + + List threads = new ArrayList(); + for (int i = 0; i < threadCount; i++) { + final List threadUpdates = new ArrayList(amount); + for (int j = 0; j < amount; j++) { + String id = this.getClass().getName() + ".testConcurrentWithConflict" + j; + UpdateOp up = new UpdateOp(id, true); + up.set("_id", id); + up.set("prop", 200 + i * amount + j); + threadUpdates.add(up); + removeMe.add(id); + } + shuffle(threadUpdates); + threads.add(new Thread() { + public void run() { + ds.createOrUpdate(Collection.NODES, threadUpdates); + } + }); + } + + for (Thread t : threads) { + t.start(); + } + for (Thread t : threads) { + t.join(10000); + if (t.isAlive()) { + fail("Thread hasn't finished in 10s"); + } + } + + for (int i = 0; i < amount; i++) { + String id = this.getClass().getName() + ".testConcurrentWithConflict" + i; + + NodeDocument newDoc = ds.find(Collection.NODES, id); + assertNotNull("The document hasn't been inserted", newDoc); + assertNotEquals("The document hasn't been updated", 100l, newDoc.get("prop")); + } + } + + /** * This method adds a few updateOperations modifying the same document. */ @Test