diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java index 0e90806..f5bbc77 100755 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java @@ -280,13 +284,155 @@ public class RDBDocumentStore implements DocumentStore { @Override public List createOrUpdate(Collection collection, List updateOps) { - List result = new ArrayList(updateOps.size()); - for (UpdateOp update : updateOps) { - result.add(createOrUpdate(collection, update)); + List result = null; + Map operationsToCover = new LinkedHashMap(); + + for (UpdateOp updateOp : updateOps) { + UpdateUtils.assertUnconditional(updateOp); + UpdateOp clone = updateOp.copy(); + operationsToCover.put(updateOp.getId(), clone); JRE: this should use #addUpdateCounters(). I would also move the .put below the counter updates for clarity. + clone.increment(MODCOUNT, 1); + if (hasChangesToCollisions(clone)) { + clone.increment(COLLISIONSMODCOUNT, 1); + } + } + + Map oldDocs = new HashMap(); + if (collection == Collection.NODES) { + oldDocs.putAll((Map) readDocumentCached(collection, operationsToCover.keySet())); + } + JRE: the for loop is hard to understand (where do the constants and the conditions come from?): maybe this should be a while loop + for (int i = 0; i < 3 && operationsToCover.size() > 2; i++) { + for (List partition : partition(newArrayList(operationsToCover.values()), CHUNKSIZE)) { + Set successfulUpdates = bulkUpdate(collection, partition, oldDocs, i == 0); + operationsToCover.keySet().removeAll(successfulUpdates); + } + } + + // if there are some changes left, we'll apply them one after + // another + for (UpdateOp updateOp : updateOps) { + if (operationsToCover.remove(updateOp.getId()) != null) { + T oldDoc = createOrUpdate(collection, updateOp.copy()); // work on the original update operation + if (oldDoc != null) { + oldDocs.put(oldDoc.getId(), oldDoc); + } + } + } + + result = new ArrayList(updateOps.size()); + for (UpdateOp op : updateOps) { + result.add(oldDocs.get(op.getId())); + } + + return result; + } + + private Map readDocumentCached(Collection collection, Set keys) { + Map documents = new HashMap(); + + if (collection == Collection.NODES) { + for (String key : keys) { + NodeDocument cached = nodesCache.getIfPresent(key); + if (cached != null && cached != NodeDocument.NULL) { + T doc = castAsT(unwrap(cached)); + documents.put(doc.getId(), doc); + } + } + } + + Set documentsToRead = Sets.difference(keys, documents.keySet()); + Map readDocuments = readDocumentsUncached(collection, documentsToRead); + documents.putAll(readDocuments); + + if (collection == Collection.NODES) { + for (T doc : readDocuments.values()) { + nodesCache.putIfAbsent((NodeDocument) doc); + } + } + + return documents; + } + + private Map readDocumentsUncached(Collection collection, Set keys) { + Map result = new HashMap(); + + Connection connection = null; + RDBTableMetaData tmd = getTable(collection); + try { + connection = this.ch.getROConnection(); JRE: it might be good to free RDBRow as soon as possible, see RDBDocumentStore#internalQuery + List rows = db.read(connection, tmd, keys); + for (RDBRow row : rows) { + T document = convertFromDBObject(collection, row); + result.put(document.getId(), document); + } + } catch (Exception ex) { + throw new DocumentStoreException(ex); + } finally { + this.ch.closeConnection(connection); } return result; } + private Set bulkUpdate(Collection collection, List updates, Map oldDocs, boolean upsert) { + Set missingDocs = new HashSet(); + for (UpdateOp op : updates) { + if (!oldDocs.containsKey(op.getId())) { + missingDocs.add(op.getId()); + } + } + for (T doc : readDocumentsUncached(collection, missingDocs).values()) { + oldDocs.put(doc.getId(), doc); + if (collection == Collection.NODES) { + nodesCache.putIfAbsent((NodeDocument) doc); + } + } + + List docsToUpdate = new ArrayList(updates.size()); + Set keysToUpdate = new HashSet(); + for (UpdateOp update : updates) { + String id = update.getId(); + T modifiedDoc = collection.newDocument(this); + if (oldDocs.containsKey(id)) { + oldDocs.get(id).deepCopy(modifiedDoc); + } + UpdateUtils.applyChanges(modifiedDoc, update); + docsToUpdate.add(modifiedDoc); + keysToUpdate.add(id); + } + + Connection connection = null; + RDBTableMetaData tmd = getTable(collection); + try { + connection = this.ch.getRWConnection(); + Set successfulUpdates = db.update(connection, tmd, docsToUpdate, upsert); + connection.commit(); + + Set failedUpdates = Sets.difference(keysToUpdate, successfulUpdates); + oldDocs.keySet().removeAll(failedUpdates); + + if (collection == Collection.NODES) { + for (T doc : docsToUpdate) { + String id = doc.getId(); + if (successfulUpdates.contains(id)) { + if (oldDocs.containsKey(id)) { + nodesCache.replaceCachedDocument((NodeDocument) oldDocs.get(id), (NodeDocument) doc); + } else { + nodesCache.putIfAbsent((NodeDocument) doc); + } + } + } + } + + return successfulUpdates; + } catch (SQLException ex) { + this.ch.rollbackConnection(connection); + throw new DocumentStoreException(ex); + } finally { + this.ch.closeConnection(connection); + } + } + @Override public T findAndUpdate(Collection collection, UpdateOp update) { return internalCreateOrUpdate(collection, update, false, true); @@ -812,6 +958,7 @@ public class RDBDocumentStore implements DocumentStore { diagnostics.append(" ").append(indexInfo); } } JRE: no unrelated changes, please... + rs.close(); } catch (SQLException ex2) { LOG.error("Failed to create table " + tableName + " in " + dbname, ex2); @@ -1449,9 +1596,9 @@ public class RDBDocumentStore implements DocumentStore { RDBTableMetaData tmd = getTable(collection); try { connection = this.ch.getRWConnection(); - boolean result = db.insert(connection, tmd, documents); + Set insertedKeys = db.insert(connection, tmd, documents); connection.commit(); - return result; + return insertedKeys.size() == documents.size(); } catch (SQLException ex) { this.ch.rollbackConnection(connection); diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStoreJDBC.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStoreJDBC.java index 6585613..bd63ba1 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStoreJDBC.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStoreJDBC.java @@ -16,6 +16,8 @@ */ package org.apache.jackrabbit.oak.plugins.document.rdb; +import static com.google.common.collect.Iterables.cycle; +import static com.google.common.collect.Iterables.limit; import static org.apache.jackrabbit.oak.plugins.document.rdb.RDBDocumentStore.CHAR2OCTETRATIO; import static org.apache.jackrabbit.oak.plugins.document.rdb.RDBDocumentStore.asBytes; import static org.apache.jackrabbit.oak.plugins.document.rdb.RDBJDBCTools.closeResultSet; @@ -29,9 +31,14 @@ import java.sql.SQLException; import java.sql.Statement; import java.sql.Types; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import javax.annotation.CheckForNull; import javax.annotation.Nonnull; @@ -47,6 +54,8 @@ import org.apache.jackrabbit.oak.plugins.document.rdb.RDBDocumentStoreDB.Prepare import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Joiner; + /** * Implements (most) DB interactions used in {@link RDBDocumentStore}. */ @@ -57,6 +66,7 @@ public class RDBDocumentStoreJDBC { private static final String COLLISIONSMODCOUNT = RDBDocumentStore.COLLISIONSMODCOUNT; private static final String MODCOUNT = NodeDocument.MOD_COUNT; private static final String MODIFIED = NodeDocument.MODIFIED_IN_SECS; + private static final int MAX_LIST_LENGTH = 1000; // ORA-01795 maximum number of expressions in a list is 1000 JRE: that sounds a bit like something that needs to be fixed separately anyway (it would hit us already if somebody set CHUNKSIZE to a large value, right?); potentiall in RDBBlobStore as well (we should address this as saparate problem to be fixed in 1.2 and 1.0 right away) private final RDBDocumentStoreDB dbInfo; private final RDBDocumentSerializer ser; @@ -120,14 +130,8 @@ public class RDBDocumentStoreJDBC { t.append(setModifiedConditionally ? "MODIFIED = case when ? > MODIFIED then ? else MODIFIED end, " : "MODIFIED = ?, "); t.append("MODCOUNT = MODCOUNT + 1, DSIZE = DSIZE + ?, "); t.append("DATA = " + stringAppend.getStatementComponent() + " "); - t.append("where ID in ("); - for (int i = 0; i < ids.size(); i++) { - if (i != 0) { - t.append(','); - } - t.append('?'); - } - t.append(")"); + t.append("where "); + appendInCondition(t, "ID", ids.size(), MAX_LIST_LENGTH); PreparedStatement stmt = connection.prepareStatement(t.toString()); try { int si = 1; @@ -271,7 +275,7 @@ public class RDBDocumentStoreJDBC { } } - public boolean insert(Connection connection, RDBTableMetaData tmd, List documents) throws SQLException { + public Set insert(Connection connection, RDBTableMetaData tmd, List documents) throws SQLException { PreparedStatement stmt = connection.prepareStatement( "insert into " + tmd.getName() + "(ID, MODIFIED, HASBINARY, DELETEDONCE, MODCOUNT, CMODCOUNT, DSIZE, DATA, BDATA) " + "values (?, ?, ?, ?, ?, ?, ?, ?, ?)"); @@ -304,20 +308,127 @@ public class RDBDocumentStoreJDBC { stmt.addBatch(); } int[] results = stmt.executeBatch(); - boolean success = true; + + Set succesfullyInserted = new HashSet(); for (int i = 0; i < documents.size(); i++) { int result = results[i]; if (result != 1 && result != Statement.SUCCESS_NO_INFO) { LOG.error("DB insert failed for {}: {}", tmd.getName(), documents.get(i).getId()); - success = false; + } else { + succesfullyInserted.add(documents.get(i).getId()); } } - return success; + return succesfullyInserted; } finally { stmt.close(); } } JRE: are we sure that the sequence of statements implemented here will actually do the right thing when done concurrently with the transaction isolation that we have? + public Set update(Connection connection, RDBTableMetaData tmd, List documents, boolean upsert) + throws SQLException { + Set successfulUpdates = new HashSet(); + + PreparedStatement stmt = connection.prepareStatement("update " + tmd.getName() + + " set MODIFIED = ?, HASBINARY = ?, DELETEDONCE = ?, MODCOUNT = ?, CMODCOUNT = ?, DSIZE = ?, DATA = ?, BDATA = ? where ID = ? and MODCOUNT = ?"); + try { + List updatedKeys = new ArrayList(); + for (T document : documents) { + Long modcount = (Long) document.get(MODCOUNT); JRE: is this a check for documents that are new? In which case please add a comment... + if (modcount == 1) { + continue; + } + + String data = this.ser.asString(document); + Number hasBinary = (Number) document.get(NodeDocument.HAS_BINARY_FLAG); + Boolean deletedOnce = (Boolean) document.get(NodeDocument.DELETED_ONCE); + Long cmodcount = (Long) document.get(COLLISIONSMODCOUNT); + + int si = 1; + stmt.setObject(si++, document.get(MODIFIED), Types.BIGINT); + stmt.setObject(si++, (hasBinary != null && hasBinary.intValue() == NodeDocument.HAS_BINARY_VAL) ? 1 : 0, + Types.SMALLINT); + stmt.setObject(si++, (deletedOnce != null && deletedOnce) ? 1 : 0, Types.SMALLINT); + stmt.setObject(si++, modcount, Types.BIGINT); + stmt.setObject(si++, cmodcount == null ? Long.valueOf(0) : cmodcount, Types.BIGINT); + stmt.setObject(si++, data.length(), Types.BIGINT); + + if (data.length() < tmd.getDataLimitInOctets() / CHAR2OCTETRATIO) { + stmt.setString(si++, data); + stmt.setBinaryStream(si++, null, 0); + } else { + stmt.setString(si++, "\"blob\""); + byte[] bytes = asBytes(data); + stmt.setBytes(si++, bytes); + } + + setIdInStatement(tmd, stmt, si++, document.getId()); + stmt.setObject(si++, modcount - 1, Types.BIGINT); + stmt.addBatch(); + updatedKeys.add(document.getId()); + } + + int[] batchResults = stmt.executeBatch(); + + for (int i = 0; i < batchResults.length; i++) { + int result = batchResults[i]; + if (result == 1) { + successfulUpdates.add(updatedKeys.get(i)); + } + } + } finally { + stmt.close(); + } + + if (upsert) { + List remainingDocuments = new ArrayList(documents.size() - successfulUpdates.size()); + for (T doc : documents) { + if (!successfulUpdates.contains(doc.getId())) { + remainingDocuments.add(doc); + } + } + + if (!remainingDocuments.isEmpty()) { + StringBuilder sql = new StringBuilder("select ID from ").append(tmd.getName()); + sql.append(" where "); + appendInCondition(sql, "ID", remainingDocuments.size(), MAX_LIST_LENGTH); + JRE: typo in variable name + Set documentsWitUpdatedModcount = new HashSet(); + + PreparedStatement selectStmt = null; + ResultSet rs = null; + try { + selectStmt = connection.prepareStatement(sql.toString()); + selectStmt.setPoolable(false); + int i = 1; + for (T doc : remainingDocuments) { + setIdInStatement(tmd, selectStmt, i++, doc.getId()); + } + rs = selectStmt.executeQuery(); + while (rs.next()) { + documentsWitUpdatedModcount.add(getIdFromRS(tmd, rs, 1)); + } + } finally { + closeResultSet(rs); + closeStatement(selectStmt); + } + JRE: ok, so we drop the documents which have been modified from the update? why is this ok? what's the contract here? + Iterator it = remainingDocuments.iterator(); + while (it.hasNext()) { + if (documentsWitUpdatedModcount.contains(it.next().getId())) { + it.remove(); + } + } + + if (!remainingDocuments.isEmpty()) { + for (String id : insert(connection, tmd, remainingDocuments)) { + successfulUpdates.add(id); + } + } + } + } + return successfulUpdates; + } + @Nonnull public List query(Connection connection, RDBTableMetaData tmd, String minId, String maxId, String indexedProperty, long startValue, int limit) throws SQLException { @@ -437,6 +548,62 @@ public class RDBDocumentStoreJDBC { return result; } + public List read(Connection connection, RDBTableMetaData tmd, Collection keys) throws SQLException { + if (keys.isEmpty()) { + return Collections.emptyList(); + } + + StringBuilder query = new StringBuilder(); + query.append("select ID, MODIFIED, MODCOUNT, CMODCOUNT, HASBINARY, DELETEDONCE, DATA, BDATA from "); + query.append(tmd.getName()); + query.append(" where "); + appendInCondition(query, "ID", keys.size(), MAX_LIST_LENGTH); + + PreparedStatement stmt = connection.prepareStatement(query.toString()); + stmt.setPoolable(false); + try { + int i = 1; + for (String id : keys) { + setIdInStatement(tmd, stmt, i++, id); + } + + ResultSet rs = stmt.executeQuery(); + + List rows = new ArrayList(); + while (rs.next()) { + int col = 1; + String id = getIdFromRS(tmd, rs, col++); + long modified = rs.getLong(col++); + long modcount = rs.getLong(col++); + long cmodcount = rs.getLong(col++); + long hasBinary = rs.getLong(col++); + long deletedOnce = rs.getLong(col++); + String data = rs.getString(col++); + byte[] bdata = rs.getBytes(col++); + RDBRow row = new RDBRow(id, hasBinary == 1, deletedOnce == 1, modified, modcount, cmodcount, data, bdata); + rows.add(row); + } + + return rows; + } catch (SQLException ex) { + LOG.error("attempting to read " + keys, ex); + // DB2 throws an SQLException for invalid keys; handle this more + // gracefully + if ("22001".equals(ex.getSQLState())) { + try { + connection.rollback(); + } catch (SQLException ex2) { + LOG.debug("failed to rollback", ex2); + } + return null; + } else { + throw (ex); + } + } finally { + stmt.close(); + } + } + @CheckForNull public RDBRow read(Connection connection, RDBTableMetaData tmd, String id, long lastmodcount) throws SQLException { PreparedStatement stmt; @@ -536,6 +703,53 @@ public class RDBDocumentStoreJDBC { } } + /** + * Appends following SQL condition to the builder: {@code ID in (?,?,?)}. + * The field name {@code ID} and the number of placeholders is configurable. + * If the number of placeholders is greater than {@code maxListLength}, then + * the condition will have following form: + * {@code (ID in (?,?,?) or ID in (?,?,?) or ID in (?,?))} + * + * @param builder the condition will be appended here + * @param field name of the field + * @param placeholdersCount how many ? should be included + * @param maxListLength what's the max number of ? in one list + */ + static void appendInCondition(StringBuilder builder, String field, int placeholdersCount, int maxListLength) { + if (placeholdersCount <= 0) { + return; + } + + if (placeholdersCount > maxListLength) { + builder.append('('); + } + + int i; + for (i = 0; i < placeholdersCount / maxListLength; i++) { + if (i > 0) { + builder.append(" or "); + } + appendInCondition(builder, field, maxListLength); + } + + if (placeholdersCount % maxListLength > 0) { + if (i > 0) { + builder.append(" or "); + } + appendInCondition(builder, field, placeholdersCount % maxListLength); + } + + if (placeholdersCount > maxListLength) { + builder.append(')'); + } + } + + private static void appendInCondition(StringBuilder builder, String field, int placeholdersCount) { + builder.append(field).append(" in ("); + Joiner.on(',').appendTo(builder, limit(cycle('?'), placeholdersCount)); + builder.append(')'); + } + private static String getIdFromRS(RDBTableMetaData tmd, ResultSet rs, int idx) throws SQLException { if (tmd.isIdBinary()) { try { diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStoreJDBCTest.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStoreJDBCTest.java new file mode 100644 index 0000000..03af6e0 --- /dev/null +++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStoreJDBCTest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document.rdb; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; + +public class RDBDocumentStoreJDBCTest { + + @Test + public void testAppendInCondition() { + assertEquals("", appendInCondition("ID", 0, 1000)); + assertEquals("ID in (?,?,?)", appendInCondition("ID", 3, 1000)); + assertEquals("(ID in (?,?,?) or ID in (?,?,?) or ID in (?,?,?))", appendInCondition("ID", 9, 3)); + assertEquals("(ID in (?,?,?) or ID in (?,?,?) or ID in (?,?,?) or ID in (?,?))", appendInCondition("ID", 11, 3)); + } + + private String appendInCondition(String field, int placeholdersCount, int maxListLength) { + StringBuilder builder = new StringBuilder(); + RDBDocumentStoreJDBC.appendInCondition(builder, field, placeholdersCount, maxListLength); + return builder.toString(); + } +}