Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java (revision 1722817) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java (working copy) @@ -17,6 +17,8 @@ package org.apache.jackrabbit.oak.plugins.document.rdb; import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.collect.Lists.newArrayList; +import static com.google.common.collect.Lists.partition; import static org.apache.jackrabbit.oak.plugins.document.UpdateUtils.checkConditions; import static org.apache.jackrabbit.oak.plugins.document.rdb.RDBJDBCTools.closeResultSet; import static org.apache.jackrabbit.oak.plugins.document.rdb.RDBJDBCTools.closeStatement; @@ -40,6 +42,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Locale; import java.util.Map; @@ -81,6 +84,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import com.google.common.hash.BloomFilter; import com.google.common.hash.Funnel; import com.google.common.hash.PrimitiveSink; @@ -290,13 +294,172 @@ @Override public List createOrUpdate(Collection collection, List updateOps) { - List result = new ArrayList(updateOps.size()); - for (UpdateOp update : updateOps) { - result.add(createOrUpdate(collection, update)); + List result = null; + Map operationsToCover = new LinkedHashMap(); + + for (UpdateOp updateOp : updateOps) { + UpdateUtils.assertUnconditional(updateOp); + UpdateOp clone = updateOp.copy(); + addUpdateCounters(clone); + operationsToCover.put(clone.getId(), clone); } + + Map oldDocs = new HashMap(); + if (collection == Collection.NODES) { + oldDocs.putAll((Map) readDocumentCached(collection, operationsToCover.keySet())); + } + + int i = 0; // iteration count + + // bulk update requires two DB requests, so if we have <= 2 operations + // it's better to send them sequentially + while (operationsToCover.size() > 2) { + // We should try to insert documents only during the first + // iteration. In the 2nd and 3rd iterations we only deal with + // conflicting documents, so they already exist in the database + // and there's no point in inserting them. + boolean upsert = i == 0; + + if (i++ == 3) { + // operations that conflicted in 3 consecutive bulk + // updates should be applied sequentially + break; + } + + for (List partition : partition(newArrayList(operationsToCover.values()), CHUNKSIZE)) { + Set successfulUpdates = bulkUpdate(collection, partition, oldDocs, upsert); + operationsToCover.keySet().removeAll(successfulUpdates); + } + } + + // if there are some changes left, we'll apply them one after another + for (UpdateOp updateOp : updateOps) { + if (operationsToCover.remove(updateOp.getId()) != null) { + // work on the original update operation + T oldDoc = createOrUpdate(collection, updateOp.copy()); + if (oldDoc != null) { + oldDocs.put(oldDoc.getId(), oldDoc); + } + } + } + + result = new ArrayList(updateOps.size()); + for (UpdateOp op : updateOps) { + result.add(oldDocs.get(op.getId())); + } + return result; } + private Map readDocumentCached(Collection collection, Set keys) { + Map documents = new HashMap(); + + if (collection == Collection.NODES) { + for (String key : keys) { + NodeDocument cached = nodesCache.getIfPresent(key); + if (cached != null && cached != NodeDocument.NULL) { + T doc = castAsT(unwrap(cached)); + documents.put(doc.getId(), doc); + } + } + } + + Set documentsToRead = Sets.difference(keys, documents.keySet()); + Map readDocuments = readDocumentsUncached(collection, documentsToRead); + documents.putAll(readDocuments); + + if (collection == Collection.NODES) { + for (T doc : readDocuments.values()) { + nodesCache.putIfAbsent((NodeDocument) doc); + } + } + + return documents; + } + + private Map readDocumentsUncached(Collection collection, Set keys) { + Map result = new HashMap(); + + Connection connection = null; + RDBTableMetaData tmd = getTable(collection); + try { + connection = this.ch.getROConnection(); + List rows = db.read(connection, tmd, keys); + + int size = rows.size(); + for (int i = 0; i < size; i++) { + RDBRow row = rows.set(i, null); + T document = convertFromDBObject(collection, row); + result.put(document.getId(), document); + } + connection.commit(); + } catch (Exception ex) { + throw new DocumentStoreException(ex); + } finally { + this.ch.closeConnection(connection); + } + return result; + } + + private Set bulkUpdate(Collection collection, List updates, Map oldDocs, boolean upsert) { + Set missingDocs = new HashSet(); + for (UpdateOp op : updates) { + if (!oldDocs.containsKey(op.getId())) { + missingDocs.add(op.getId()); + } + } + for (T doc : readDocumentsUncached(collection, missingDocs).values()) { + oldDocs.put(doc.getId(), doc); + if (collection == Collection.NODES) { + nodesCache.putIfAbsent((NodeDocument) doc); + } + } + + List docsToUpdate = new ArrayList(updates.size()); + Set keysToUpdate = new HashSet(); + for (UpdateOp update : updates) { + String id = update.getId(); + T modifiedDoc = collection.newDocument(this); + if (oldDocs.containsKey(id)) { + oldDocs.get(id).deepCopy(modifiedDoc); + } + UpdateUtils.applyChanges(modifiedDoc, update); + docsToUpdate.add(modifiedDoc); + keysToUpdate.add(id); + } + + Connection connection = null; + RDBTableMetaData tmd = getTable(collection); + try { + connection = this.ch.getRWConnection(); + Set successfulUpdates = db.update(connection, tmd, docsToUpdate, upsert); + connection.commit(); + + Set failedUpdates = Sets.difference(keysToUpdate, successfulUpdates); + oldDocs.keySet().removeAll(failedUpdates); + + if (collection == Collection.NODES) { + for (T doc : docsToUpdate) { + String id = doc.getId(); + if (successfulUpdates.contains(id)) { + if (oldDocs.containsKey(id)) { + nodesCache.replaceCachedDocument((NodeDocument) oldDocs.get(id), (NodeDocument) doc); + } else { + nodesCache.putIfAbsent((NodeDocument) doc); + } + } + } + } + + return successfulUpdates; + } catch (SQLException ex) { + this.ch.rollbackConnection(connection); + throw new DocumentStoreException(ex); + } finally { + this.ch.closeConnection(connection); + } + } + @Override public T findAndUpdate(Collection collection, UpdateOp update) { return internalCreateOrUpdate(collection, update, false, true); @@ -1463,9 +1626,9 @@ RDBTableMetaData tmd = getTable(collection); try { connection = this.ch.getRWConnection(); - boolean result = db.insert(connection, tmd, documents); + Set insertedKeys = db.insert(connection, tmd, documents); connection.commit(); - return result; + return insertedKeys.size() == documents.size(); } catch (SQLException ex) { this.ch.rollbackConnection(connection); Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStoreJDBC.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStoreJDBC.java (revision 1722817) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStoreJDBC.java (working copy) @@ -29,9 +29,11 @@ import java.sql.Statement; import java.sql.Types; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -52,6 +54,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Function; +import com.google.common.collect.Lists; + /** * Implements (most) DB interactions used in {@link RDBDocumentStore}. */ @@ -150,7 +155,6 @@ public int delete(Connection connection, RDBTableMetaData tmd, List ids) throws SQLException { PreparedStatement stmt; - int cnt = ids.size(); PreparedStatementComponent inClause = RDBJDBCTools.createInStatement("ID", ids, tmd.isIdBinary()); String sql = "delete from " + tmd.getName() + " where " + inClause.getStatementComponent(); stmt = connection.prepareStatement(sql); @@ -158,7 +162,7 @@ try { inClause.setParameters(stmt, 1); int result = stmt.executeUpdate(); - if (result != cnt) { + if (result != ids.size()) { LOG.debug("DB delete failed for " + tmd.getName() + "/" + ids); } return result; @@ -256,7 +260,7 @@ } } - public boolean insert(Connection connection, RDBTableMetaData tmd, List documents) throws SQLException { + public Set insert(Connection connection, RDBTableMetaData tmd, List documents) throws SQLException { PreparedStatement stmt = connection.prepareStatement( "insert into " + tmd.getName() + "(ID, MODIFIED, HASBINARY, DELETEDONCE, MODCOUNT, CMODCOUNT, DSIZE, DATA, BDATA) " + "values (?, ?, ?, ?, ?, ?, ?, ?, ?)"); @@ -289,20 +293,141 @@ stmt.addBatch(); } int[] results = stmt.executeBatch(); - boolean success = true; + + Set succesfullyInserted = new HashSet(); for (int i = 0; i < documents.size(); i++) { int result = results[i]; if (result != 1 && result != Statement.SUCCESS_NO_INFO) { LOG.error("DB insert failed for {}: {}", tmd.getName(), documents.get(i).getId()); - success = false; + } else { + succesfullyInserted.add(documents.get(i).getId()); } } - return success; + return succesfullyInserted; } finally { stmt.close(); } } + /** + * Update a list of documents using JDBC batches. Some of the updates may fail because of the concurrent + * changes. The method returns a set of successfully updated documents. It's the caller responsibility + * to compare the set with the list of input documents, find out which documents conflicted and take + * appropriate action. + *

+ * If the {@code upsert} parameter is set to true, the method will also try to insert new documents, those + * which modcount equals to 1. + * + * @param connection JDBC connection + * @param tmd Table metadata + * @param documents List of documents to update + * @param upsert Insert new documents + * @return set containing ids of successfully updated documents + * @throws SQLException + */ + public Set update(Connection connection, RDBTableMetaData tmd, List documents, boolean upsert) + throws SQLException { + Set successfulUpdates = new HashSet(); + + PreparedStatement stmt = connection.prepareStatement("update " + tmd.getName() + + " set MODIFIED = ?, HASBINARY = ?, DELETEDONCE = ?, MODCOUNT = ?, CMODCOUNT = ?, DSIZE = ?, DATA = ?, BDATA = ? where ID = ? and MODCOUNT = ?"); + try { + List updatedKeys = new ArrayList(); + for (T document : documents) { + Long modcount = (Long) document.get(MODCOUNT); + if (modcount == 1) { + continue; // This is a new document. We'll deal with the inserts later. + } + + String data = this.ser.asString(document); + Number hasBinary = (Number) document.get(NodeDocument.HAS_BINARY_FLAG); + Boolean deletedOnce = (Boolean) document.get(NodeDocument.DELETED_ONCE); + Long cmodcount = (Long) document.get(COLLISIONSMODCOUNT); + + int si = 1; + stmt.setObject(si++, document.get(MODIFIED), Types.BIGINT); + stmt.setObject(si++, (hasBinary != null && hasBinary.intValue() == NodeDocument.HAS_BINARY_VAL) ? 1 : 0, + Types.SMALLINT); + stmt.setObject(si++, (deletedOnce != null && deletedOnce) ? 1 : 0, Types.SMALLINT); + stmt.setObject(si++, modcount, Types.BIGINT); + stmt.setObject(si++, cmodcount == null ? Long.valueOf(0) : cmodcount, Types.BIGINT); + stmt.setObject(si++, data.length(), Types.BIGINT); + + if (data.length() < tmd.getDataLimitInOctets() / CHAR2OCTETRATIO) { + stmt.setString(si++, data); + stmt.setBinaryStream(si++, null, 0); + } else { + stmt.setString(si++, "\"blob\""); + byte[] bytes = asBytes(data); + stmt.setBytes(si++, bytes); + } + + setIdInStatement(tmd, stmt, si++, document.getId()); + stmt.setObject(si++, modcount - 1, Types.BIGINT); + stmt.addBatch(); + updatedKeys.add(document.getId()); + } + + int[] batchResults = stmt.executeBatch(); + + for (int i = 0; i < batchResults.length; i++) { + int result = batchResults[i]; + if (result == 1 || result == Statement.SUCCESS_NO_INFO) { + successfulUpdates.add(updatedKeys.get(i)); + } + } + } finally { + stmt.close(); + } + + if (upsert) { + List remainingDocuments = new ArrayList(documents.size() - successfulUpdates.size()); + for (T doc : documents) { + if (!successfulUpdates.contains(doc.getId())) { + remainingDocuments.add(doc); + } + } + + if (!remainingDocuments.isEmpty()) { + List remainingDocumentIds = Lists.transform(remainingDocuments, idExtractor); + PreparedStatementComponent inClause = RDBJDBCTools.createInStatement("ID", remainingDocumentIds, tmd.isIdBinary()); + StringBuilder sql = new StringBuilder("select ID from ").append(tmd.getName()); + sql.append(" where ").append(inClause.getStatementComponent()); + + Set documentsWithUpdatedModcount = new HashSet(); + + PreparedStatement selectStmt = null; + ResultSet rs = null; + try { + selectStmt = connection.prepareStatement(sql.toString()); + selectStmt.setPoolable(false); + inClause.setParameters(selectStmt, 1); + rs = selectStmt.executeQuery(); + while (rs.next()) { + documentsWithUpdatedModcount.add(getIdFromRS(tmd, rs, 1)); + } + } finally { + closeResultSet(rs); + closeStatement(selectStmt); + } + + Iterator it = remainingDocuments.iterator(); + while (it.hasNext()) { + if (documentsWithUpdatedModcount.contains(it.next().getId())) { + it.remove(); + } + } + + if (!remainingDocuments.isEmpty()) { + for (String id : insert(connection, tmd, remainingDocuments)) { + successfulUpdates.add(id); + } + } + } + } + return successfulUpdates; + } + private final static Map INDEXED_PROP_MAPPING; static { Map tmp = new HashMap(); @@ -450,6 +575,58 @@ return result; } + public List read(Connection connection, RDBTableMetaData tmd, Collection keys) throws SQLException { + if (keys.isEmpty()) { + return Collections.emptyList(); + } + + PreparedStatementComponent inClause = RDBJDBCTools.createInStatement("ID", keys, tmd.isIdBinary()); + StringBuilder query = new StringBuilder(); + query.append("select ID, MODIFIED, MODCOUNT, CMODCOUNT, HASBINARY, DELETEDONCE, DATA, BDATA from "); + query.append(tmd.getName()); + query.append(" where ").append(inClause.getStatementComponent()); + + PreparedStatement stmt = connection.prepareStatement(query.toString()); + stmt.setPoolable(false); + try { + inClause.setParameters(stmt, 1); + ResultSet rs = stmt.executeQuery(); + + List rows = new ArrayList(); + while (rs.next()) { + int col = 1; + String id = getIdFromRS(tmd, rs, col++); + long modified = rs.getLong(col++); + long modcount = rs.getLong(col++); + long cmodcount = rs.getLong(col++); + long hasBinary = rs.getLong(col++); + long deletedOnce = rs.getLong(col++); + String data = rs.getString(col++); + byte[] bdata = rs.getBytes(col++); + RDBRow row = new RDBRow(id, hasBinary == 1, deletedOnce == 1, modified, modcount, cmodcount, data, bdata); + rows.add(row); + } + + return rows; + } catch (SQLException ex) { + LOG.error("attempting to read " + keys, ex); + // DB2 throws an SQLException for invalid keys; handle this more + // gracefully + if ("22001".equals(ex.getSQLState())) { + try { + connection.rollback(); + } catch (SQLException ex2) { + LOG.debug("failed to rollback", ex2); + } + return null; + } else { + throw (ex); + } + } finally { + stmt.close(); + } + } + @CheckForNull public RDBRow read(Connection connection, RDBTableMetaData tmd, String id, long lastmodcount) throws SQLException { PreparedStatement stmt; @@ -574,4 +751,11 @@ stmt.setString(idx, id); } } + + private static final Function idExtractor = new Function() { + @Override + public String apply(Document input) { + return input.getId(); + } + }; } Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBJDBCTools.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBJDBCTools.java (revision 1722817) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBJDBCTools.java (working copy) @@ -29,6 +29,7 @@ import java.sql.Statement; import java.sql.Types; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Locale; import java.util.Map; @@ -328,7 +329,7 @@ builder.append(')'); } - public static PreparedStatementComponent createInStatement(final String fieldName, final List values, + public static PreparedStatementComponent createInStatement(final String fieldName, final Collection values, final boolean binary) { return new PreparedStatementComponent() { Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateTest.java (revision 1722817) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateTest.java (working copy) @@ -23,6 +23,9 @@ import java.util.ArrayList; import java.util.List; +import javax.sql.DataSource; + +import org.apache.jackrabbit.oak.plugins.document.rdb.RDBDataSourceWrapper; import org.junit.Test; public class BulkCreateOrUpdateTest extends AbstractDocumentStoreTest { @@ -29,6 +32,11 @@ public BulkCreateOrUpdateTest(DocumentStoreFixture dsf) { super(dsf); + DataSource ds = dsf.getRDBDataSource(); + if (ds instanceof RDBDataSourceWrapper) { + // test drivers that do not return precise batch results + ((RDBDataSourceWrapper)ds).setBatchResultPrecise(false); + } } /** Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MultiDocumentStoreTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MultiDocumentStoreTest.java (revision 1722817) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MultiDocumentStoreTest.java (working copy) @@ -18,9 +18,12 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import org.junit.Test; @@ -209,6 +212,68 @@ assertTrue(nd1.getLastCheckTime() > ds1checktime); } + @Test + public void testInterleavedBatchUpdate() { + int amount = 10; + int halfAmount = amount / 2; + String baseId = this.getClass().getName() + ".testInterleavedBatchUpdate"; + + // remove if present + for (int i = 0; i < amount; i++) { + String id = baseId + "-" + i; + NodeDocument nd = super.ds1.find(Collection.NODES, id); + if (nd != null) { + super.ds1.remove(Collection.NODES, id); + } + removeMe.add(id); + } + + { + // create half of the entries in ds1 + List ops = new ArrayList(); + for (int i = 0; i < halfAmount; i++) { + String id = baseId + "-" + i; + UpdateOp up = new UpdateOp(id, true); + up.set(Document.ID, id); + up.set("_createdby", "ds1"); + ops.add(up); + } + List result = super.ds1.createOrUpdate(Collection.NODES, ops); + assertEquals(halfAmount, result.size()); + for (NodeDocument doc : result) { + assertNull(doc); + } + } + + { + // create all of the entries in ds2 + List ops = new ArrayList(); + for (int i = 0; i < amount; i++) { + String id = baseId + "-" + i; + UpdateOp up = new UpdateOp(id, true); + up.set(Document.ID, id); + up.set("_createdby", "ds2"); + ops.add(up); + } + List result = super.ds2.createOrUpdate(Collection.NODES, ops); + assertEquals(amount, result.size()); + for (NodeDocument doc : result) { + // documents are either new or have been created by ds1 + if (doc != null) { + assertEquals("ds1", doc.get("_createdby")); + } + } + } + + // final check: does DS1 see all documents including the changes made by DS2? + for (int i = 0; i < amount; i++) { + String id = baseId + "-" + i; + NodeDocument doc = super.ds1.find(Collection.NODES, id, 0); + assertNotNull(doc); + assertEquals("ds2", doc.get("_createdby")); + } + } + private static long letTimeElapse() { long ts = System.currentTimeMillis(); while (System.currentTimeMillis() == ts) { Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDataSourceWrapper.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDataSourceWrapper.java (revision 1722817) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDataSourceWrapper.java (working copy) @@ -53,6 +53,7 @@ // } private final DataSource ds; + private boolean batchResultPrecise = true; // Logging @@ -84,6 +85,17 @@ return stopLog(Thread.currentThread()); } + /** + * Set to {@code false} to simulate drivers/DBs that do not return the number of affected rows in {@link Statement#executeBatch()}. + */ + public void setBatchResultPrecise(boolean precise) { + this.batchResultPrecise = precise; + } + + public boolean isBatchResultPrecise() { + return this.batchResultPrecise; + } + // DataSource public RDBDataSourceWrapper(DataSource ds) { Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBPreparedStatementWrapper.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBPreparedStatementWrapper.java (revision 1722817) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBPreparedStatementWrapper.java (working copy) @@ -36,6 +36,7 @@ import java.sql.SQLFeatureNotSupportedException; import java.sql.SQLWarning; import java.sql.SQLXML; +import java.sql.Statement; import java.sql.Time; import java.sql.Timestamp; import java.util.ArrayList; @@ -125,6 +126,14 @@ SQLException x = null; try { results = statement.executeBatch(); + // not all JDBC drivers return the number of affected rows + if (!datasource.isBatchResultPrecise()) { + for (int i = 0; i < results.length; i++) { + if (results[i] >= 0) { + results[i] = Statement.SUCCESS_NO_INFO; + } + } + } return results; } catch (SQLException ex) { x = ex;