Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/PostgreSQLPersistenceManager.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/persistence/bundle/PostgreSQLPersistenceManager.java (revision 778491) +++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/PostgreSQLPersistenceManager.java (working copy) @@ -16,6 +16,12 @@ */ package org.apache.jackrabbit.core.persistence.bundle; +import java.io.DataInputStream; +import java.io.InputStream; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; + import org.apache.jackrabbit.core.NodeId; import org.apache.jackrabbit.core.persistence.PMContext; import org.apache.jackrabbit.core.persistence.bundle.util.DbNameIndex; @@ -26,12 +32,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.DataInputStream; -import java.io.InputStream; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; - /** * Extends the {@link BundleDbPersistenceManager} by PostgreSQL specific code. *

@@ -77,7 +77,7 @@ * @throws java.sql.SQLException if an SQL error occurs. */ protected DbNameIndex createDbNameIndex() throws SQLException { - return new PostgreSQLNameIndex(connectionManager, schemaObjectPrefix); + return new PostgreSQLNameIndex(dataSource, schemaObjectPrefix); } /** @@ -90,8 +90,9 @@ protected synchronized NodePropBundle loadBundle(NodeId id) throws ItemStateException { - try { - Statement stmt = connectionManager.executeStmt(bundleSelectSQL, getKey(id.getUUID())); + BundleDbContext context = createContext(); + try { + Statement stmt = context.getConnectionHelper().executeStmt(bundleSelectSQL, getKey(id.getUUID())); ResultSet rs = stmt.getResultSet(); try { if (rs.next()) { @@ -115,6 +116,8 @@ String msg = "failed to read bundle: " + id + ": " + e; log.error(msg); throw new ItemStateException(msg, e); + } finally { + context.closeSilently(); } } Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/BundleFsPersistenceManager.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/persistence/bundle/BundleFsPersistenceManager.java (revision 778491) +++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/BundleFsPersistenceManager.java (working copy) @@ -103,7 +103,15 @@ * the name of this persistence manager */ private String name = super.toString(); - + + protected Context createContext() { + return new Context() { + @Override + void closeSilently() { + + } + }; + } /** * Returns the configured block size of the blob cqfs @@ -236,7 +244,7 @@ /** * {@inheritDoc} */ - protected synchronized NodePropBundle loadBundle(NodeId id) + protected synchronized NodePropBundle loadBundle(NodeId id, Context context) throws ItemStateException { DataInputStream din = null; try { @@ -262,7 +270,7 @@ /** * {@inheritDoc} */ - protected synchronized boolean existsBundle(NodeId id) throws ItemStateException { + protected synchronized boolean existsBundle(NodeId id, Context context) throws ItemStateException { try { StringBuffer buf = buildNodeFilePath(null, id); return itemFs.exists(buf.toString()); @@ -313,7 +321,7 @@ /** * {@inheritDoc} */ - protected synchronized void storeBundle(NodePropBundle bundle) throws ItemStateException { + protected synchronized void storeBundle(NodePropBundle bundle, Context context) throws ItemStateException { try { StringBuffer buf = buildNodeFolderPath(null, bundle.getId()); buf.append('.'); @@ -337,7 +345,7 @@ /** * {@inheritDoc} */ - protected synchronized void destroyBundle(NodePropBundle bundle) throws ItemStateException { + protected synchronized void destroyBundle(NodePropBundle bundle, Context context) throws ItemStateException { try { StringBuffer buf = buildNodeFilePath(null, bundle.getId()); itemFs.deleteFile(buf.toString()); @@ -384,7 +392,7 @@ /** * {@inheritDoc} */ - public synchronized void store(NodeReferences refs) + public synchronized void store(NodeReferences refs, Context context) throws ItemStateException { if (!initialized) { throw new IllegalStateException("not initialized"); @@ -411,7 +419,7 @@ /** * {@inheritDoc} */ - public synchronized void destroy(NodeReferences refs) throws ItemStateException { + public synchronized void destroy(NodeReferences refs, Context context) throws ItemStateException { if (!initialized) { throw new IllegalStateException("not initialized"); } Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/DerbyPersistenceManager.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/persistence/bundle/DerbyPersistenceManager.java (revision 778491) +++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/DerbyPersistenceManager.java (working copy) @@ -19,7 +19,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.jackrabbit.core.persistence.PMContext; +import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionHelper; +import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Statement; @@ -257,11 +259,14 @@ /** * {@inheritDoc} */ - protected void checkSchema() throws SQLException, RepositoryException { - // set properties + protected void checkSchema() throws SQLException, RepositoryException { + // set properties if (DERBY_EMBEDDED_DRIVER.equals(getDriver())) { - Statement stmt = connectionManager.getConnection().createStatement(); + Connection connection = null; + Statement stmt = null; try { + connection = dataSource.getConnection(); + stmt = connection.createStatement(); stmt.execute("CALL SYSCS_UTIL.SYSCS_SET_DATABASE_PROPERTY " + "('derby.storage.initialPages', '" + derbyStorageInitialPages + "')"); stmt.execute("CALL SYSCS_UTIL.SYSCS_SET_DATABASE_PROPERTY " @@ -274,7 +279,8 @@ + "('derby.storage.pageSize', '" + derbyStoragePageSize + "')"); } finally { - stmt.close(); + ConnectionHelper.closeSilently(stmt); + ConnectionHelper.closeSilently(connection); } } super.checkSchema(); @@ -306,7 +312,22 @@ } // prepare connection url for issuing shutdown command - String url = connectionManager.getConnection().getMetaData().getURL(); + String url = null; + Connection connection = null; + + try { + connection = dataSource.getConnection(); + url = connection.getMetaData().getURL(); + + // we have to reset the connection to 'autoCommit=true' before closing it; + // otherwise Derby would mysteriously complain about some pending uncommitted + // changes which can't possibly be true. + // @todo further investigate + connection.setAutoCommit(true); + } + finally { + ConnectionHelper.closeSilently(connection); + } int pos = url.lastIndexOf(';'); if (pos != -1) { // strip any attributes from connection url @@ -314,12 +335,6 @@ } url += ";shutdown=true"; - // we have to reset the connection to 'autoCommit=true' before closing it; - // otherwise Derby would mysteriously complain about some pending uncommitted - // changes which can't possibly be true. - // @todo further investigate - connectionManager.getConnection().setAutoCommit(true); - super.close(); // now it's safe to shutdown the embedded Derby database Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/OraclePersistenceManager.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/persistence/bundle/OraclePersistenceManager.java (revision 778491) +++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/OraclePersistenceManager.java (working copy) @@ -16,10 +16,12 @@ */ package org.apache.jackrabbit.core.persistence.bundle; +import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.SQLException; import org.apache.jackrabbit.core.persistence.PMContext; +import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionHelper; import org.apache.jackrabbit.core.persistence.bundle.util.DbNameIndex; import org.apache.jackrabbit.core.persistence.bundle.util.NGKDbNameIndex; import org.apache.jackrabbit.util.Text; @@ -104,8 +106,9 @@ super.init(context); // check driver version + Connection connection = dataSource.getConnection(); try { - DatabaseMetaData metaData = connectionManager.getConnection().getMetaData(); + DatabaseMetaData metaData = connection.getMetaData(); if (metaData.getDriverMajorVersion() < 10) { // Oracle drivers prior to version 10 only support // writing BLOBs up to 32k in size... @@ -115,6 +118,8 @@ } } catch (SQLException e) { log.warn("Can not retrieve driver version", e); + } finally { + ConnectionHelper.closeSilently(connection); } } @@ -124,7 +129,7 @@ * @throws SQLException if an SQL error occurs. */ protected DbNameIndex createDbNameIndex() throws SQLException { - return new NGKDbNameIndex(connectionManager, schemaObjectPrefix); + return new NGKDbNameIndex(dataSource, schemaObjectPrefix); } /** @@ -159,8 +164,16 @@ * @inheritDoc */ protected void prepareSchemaObjectPrefix() throws Exception { - DatabaseMetaData metaData = connectionManager.getConnection().getMetaData(); - String legalChars = metaData.getExtraNameCharacters(); + String legalChars; + Connection connection = null; + try { + connection = dataSource.getConnection(); + DatabaseMetaData metaData = connection.getMetaData(); + legalChars = metaData.getExtraNameCharacters(); + } finally { + ConnectionHelper.closeSilently(connection); + } + legalChars += "ABCDEFGHIJKLMNOPQRSTUVWXZY0123456789_"; String prefix = schemaObjectPrefix.toUpperCase(); Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/BundleDbPersistenceManager.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/persistence/bundle/BundleDbPersistenceManager.java (revision 778491) +++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/BundleDbPersistenceManager.java (working copy) @@ -39,6 +39,7 @@ import java.util.NoSuchElementException; import javax.jcr.RepositoryException; +import javax.sql.DataSource; import org.apache.commons.io.IOUtils; import org.apache.jackrabbit.core.NodeId; @@ -49,11 +50,12 @@ import org.apache.jackrabbit.core.fs.local.LocalFileSystem; import org.apache.jackrabbit.core.persistence.PMContext; import org.apache.jackrabbit.core.persistence.bundle.util.BundleBinding; -import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionRecoveryManager; +import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionFactory; +import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionHelper; import org.apache.jackrabbit.core.persistence.bundle.util.DbNameIndex; import org.apache.jackrabbit.core.persistence.bundle.util.ErrorHandling; import org.apache.jackrabbit.core.persistence.bundle.util.NodePropBundle; -import org.apache.jackrabbit.core.util.StringIndex; +import org.apache.jackrabbit.core.persistence.bundle.util.StreamWrapper; import org.apache.jackrabbit.core.persistence.util.BLOBStore; import org.apache.jackrabbit.core.persistence.util.FileSystemBLOBStore; import org.apache.jackrabbit.core.persistence.util.Serializer; @@ -62,6 +64,7 @@ import org.apache.jackrabbit.core.state.NoSuchItemStateException; import org.apache.jackrabbit.core.state.NodeReferences; import org.apache.jackrabbit.core.state.NodeReferencesId; +import org.apache.jackrabbit.core.util.StringIndex; import org.apache.jackrabbit.util.Text; import org.apache.jackrabbit.uuid.UUID; import org.slf4j.Logger; @@ -119,6 +122,8 @@ /** the database type */ protected String databaseType; + + protected DataSource dataSource; /** the prefix for the database objects */ protected String schemaObjectPrefix; @@ -138,11 +143,6 @@ /** indicates whether to block if the database connection is lost */ protected boolean blockOnConnectionLoss; - /** - * The class that manages statement execution and recovery from connection loss. - */ - protected ConnectionRecoveryManager connectionManager; - // SQL statements for bundle management protected String bundleInsertSQL; protected String bundleUpdateSQL; @@ -184,6 +184,53 @@ */ private String name = super.toString(); + protected class BundleDbContext extends Context + { + /** + * The class that manages statement execution and recovery from connection loss. + */ + + public BundleDbContext() { + + } + + private Connection connection; + + /** + * Lazily intiialize database connection + * @return + * @throws SQLException + * @throws RepositoryException + */ + public Connection getConnection() throws SQLException, RepositoryException + { + if (connection == null) + { + connection = dataSource.getConnection(); + } + return connection; + } + + public ConnectionHelper getConnectionHelper() throws SQLException, RepositoryException + { + return new ConnectionHelper(getConnection()); + } + + public void closeSilently(Statement statement) + { + ConnectionHelper.closeSilently(statement); + } + + public void closeSilently() + { + ConnectionHelper.closeSilently(connection); + } + }; + + @Override + protected BundleDbContext createContext() { + return new BundleDbContext(); + } /** * Returns the configured JDBC connection url. @@ -448,9 +495,12 @@ throw new RepositoryException(msg); } BufferedReader reader = new BufferedReader(new InputStreamReader(in)); - Statement stmt = connectionManager.getConnection().createStatement(); + BundleDbContext context = createContext(); + + Statement stmt = null; String sql = null; try { + stmt = context.getConnection().createStatement(); sql = reader.readLine(); while (sql != null) { if (!sql.startsWith("#") && sql.length() > 0 @@ -474,7 +524,8 @@ throw se; } finally { IOUtils.closeQuietly(in); - stmt.close(); + context.closeSilently(stmt); + context.closeSilently(); } } } @@ -500,20 +551,25 @@ * @throws RepositoryException if a repository exception occurs. */ protected boolean checkTablesExist() throws SQLException, RepositoryException { - DatabaseMetaData metaData = connectionManager.getConnection().getMetaData(); - String tableName = schemaObjectPrefix + "BUNDLE"; - if (metaData.storesLowerCaseIdentifiers()) { - tableName = tableName.toLowerCase(); - } else if (metaData.storesUpperCaseIdentifiers()) { - tableName = tableName.toUpperCase(); - } - String userName = checkTablesWithUser() ? metaData.getUserName() : null; - ResultSet rs = metaData.getTables(null, userName, tableName, null); - try { - return rs.next(); - } finally { - rs.close(); - } + BundleDbContext context = createContext(); + try { + DatabaseMetaData metaData = context.getConnection().getMetaData(); + String tableName = schemaObjectPrefix + "BUNDLE"; + if (metaData.storesLowerCaseIdentifiers()) { + tableName = tableName.toLowerCase(); + } else if (metaData.storesUpperCaseIdentifiers()) { + tableName = tableName.toUpperCase(); + } + String userName = checkTablesWithUser() ? metaData.getUserName() : null; + ResultSet rs = metaData.getTables(null, userName, tableName, null); + try { + return rs.next(); + } finally { + rs.close(); + } + } finally { + context.closeSilently(); + } } /** @@ -534,17 +590,17 @@ * * Basically wraps a JDBC transaction around super.store(). */ - public synchronized void store(ChangeLog changeLog) throws ItemStateException { + public synchronized void store(ChangeLog changeLog, Context context) throws ItemStateException { int trials = 2; - Throwable lastException = null; + Throwable lastException = null; do { trials--; Connection con = null; + try { - con = connectionManager.getConnection(); - connectionManager.setAutoReconnect(false); + con = ((BundleDbContext)context).getConnection(); con.setAutoCommit(false); - super.store(changeLog); + super.store(changeLog, context); con.commit(); con.setAutoCommit(true); return; @@ -556,13 +612,8 @@ } } catch (SQLException e) { logException("rollback failed", e); - } - if (th instanceof SQLException || th.getCause() instanceof SQLException) { - connectionManager.close(); - } - } finally { - connectionManager.setAutoReconnect(true); - } + } + } } while(blockOnConnectionLoss || trials > 0); throw new ItemStateException(lastException.getMessage()); } @@ -576,10 +627,9 @@ } super.init(context); - this.name = context.getHomeDir().getName(); - - connectionManager = new ConnectionRecoveryManager(blockOnConnectionLoss, - getDriver(), getUrl(), getUser(), getPassword()); + dataSource = ConnectionFactory.getDataSource(getDriver(), getUrl(), getUser(), getPassword()); + + this.name = context.getHomeDir().getName(); // make sure schemaObjectPrefix consists of legal name characters only prepareSchemaObjectPrefix(); @@ -602,6 +652,7 @@ // check all bundles checkConsistency(null, true, consistencyFix); } + } /** @@ -655,7 +706,7 @@ * @throws SQLException if an SQL error occurs. */ protected DbNameIndex createDbNameIndex() throws SQLException { - return new DbNameIndex(connectionManager, schemaObjectPrefix); + return new DbNameIndex(dataSource, schemaObjectPrefix); } /** @@ -711,7 +762,8 @@ * {@linkplain NodePropBundle bundles} here */ protected void checkBundleConsistency(NodeId id, NodePropBundle bundle, - boolean fix, Collection modifications) { + boolean fix, Collection modifications, + BundleDbContext context) { //log.info(name + ": checking bundle '" + id + "'"); // look at the node's children @@ -728,7 +780,7 @@ try { // analyze child node bundles - NodePropBundle child = loadBundle(entry.getId(), true); + NodePropBundle child = loadBundle(entry.getId(), true, context); if (child == null) { log.error( "NodeState '" + id + "' references inexistent child" @@ -760,7 +812,7 @@ try { // skip root nodes (that point to itself) if (parentId != null && !id.toString().endsWith("babecafebabe")) { - if (!existsBundle(parentId)) { + if (!existsBundle(parentId, context)) { log.error("NodeState '" + id + "' references inexistent parent uuid '" + parentId + "'"); } } @@ -769,23 +821,18 @@ } } - /** - * {@inheritDoc} - */ - public void checkConsistency(String[] uuids, boolean recursive, boolean fix) { - log.info("{}: checking workspace consistency...", name); - + public void checkConsistency(String[] uuids, boolean recursive, boolean fix, BundleDbContext context) { int count = 0; int total = 0; - Collection modifications = new ArrayList(); - + Collection modifications = new ArrayList(); + if (uuids == null) { // get all node bundles in the database with a single sql statement, - // which is (probably) faster than loading each bundle and traversing the tree + // which is (probably) faster than loading each bundle and traversing the tree ResultSet rs = null; - try { + try { String sql = "select count(*) from " + schemaObjectPrefix + "BUNDLE"; - Statement stmt = connectionManager.executeStmt(sql, new Object[0]); + Statement stmt = context.getConnectionHelper().executeStmt(sql, new Object[0]); try { rs = stmt.getResultSet(); if (!rs.next()) { @@ -801,7 +848,7 @@ } else { sql = "select NODE_ID_HI, NODE_ID_LO from " + schemaObjectPrefix + "BUNDLE"; } - stmt = connectionManager.executeStmt(sql, new Object[0]); + stmt = context.getConnectionHelper().executeStmt(sql, new Object[0]); rs = stmt.getResultSet(); // iterate over all node bundles in the db @@ -817,7 +864,7 @@ ResultSet bRs = null; byte[] data = null; try { - Statement bSmt = connectionManager.executeStmt(bundleSelectSQL, getKey(id.getUUID())); + Statement bSmt = context.getConnectionHelper().executeStmt(bundleSelectSQL, getKey(id.getUUID())); bRs = bSmt.getResultSet(); if (!bRs.next()) { throw new SQLException("bundle cannot be retrieved?"); @@ -837,7 +884,7 @@ // reset stream for readBundle() din = new DataInputStream(new ByteArrayInputStream(data)); NodePropBundle bundle = binding.readBundle(din, id); - checkBundleConsistency(id, bundle, fix, modifications); + checkBundleConsistency(id, bundle, fix, modifications, context); } else { log.error("invalid bundle '" + id + "', see previous BundleBinding error log entry"); } @@ -851,7 +898,7 @@ } } catch (Exception e) { log.error("Error loading bundle", e); - } finally { + } finally { closeResultSet(rs); total = count; } @@ -873,21 +920,21 @@ log.error("Invalid uuid for consistency check, skipping: '" + uuids[i] + "': " + e); } } - + // iterate over UUIDs (including ones that are newly added inside the loop!) for (int i = 0; i < uuidList.size(); i++) { final UUID uuid = (UUID) uuidList.get(i); try { // load the node from the database NodeId id = new NodeId(uuid); - NodePropBundle bundle = loadBundle(id, true); + NodePropBundle bundle = loadBundle(id, true, context); if (bundle == null) { log.error("No bundle found for uuid '" + uuid + "'"); continue; } - checkBundleConsistency(id, bundle, fix, modifications); + checkBundleConsistency(id, bundle, fix, modifications, context); if (recursive) { for (NodePropBundle.ChildNodeEntry entry : bundle.getChildNodeEntries()) { @@ -914,7 +961,7 @@ try { log.info(name + ": Fixing bundle '" + bundle.getId() + "'"); bundle.markOld(); // use UPDATE instead of INSERT - storeBundle(bundle); + storeBundle(bundle, context); evictBundle(bundle.getId()); } catch (ItemStateException e) { log.error(name + ": Error storing fixed bundle: " + e); @@ -924,6 +971,20 @@ log.info(name + ": checked " + count + "/" + total + " bundles."); } + + /** + * {@inheritDoc} + */ + public void checkConsistency(String[] uuids, boolean recursive, boolean fix) { + log.info("{}: checking workspace consistency...", name); + + BundleDbContext context = createContext(); + try { + checkConsistency(uuids, recursive, fix, context); + } finally { + context.closeSilently(); + } + } /** * Makes sure that schemaObjectPrefix does only consist of @@ -933,25 +994,30 @@ * @throws Exception if an error occurs */ protected void prepareSchemaObjectPrefix() throws Exception { - DatabaseMetaData metaData = connectionManager.getConnection().getMetaData(); - String legalChars = metaData.getExtraNameCharacters(); - legalChars += "ABCDEFGHIJKLMNOPQRSTUVWXZY0123456789_"; - - String prefix = schemaObjectPrefix.toUpperCase(); - StringBuffer escaped = new StringBuffer(); - for (int i = 0; i < prefix.length(); i++) { - char c = prefix.charAt(i); - if (legalChars.indexOf(c) == -1) { - escaped.append("_x"); - String hex = Integer.toHexString(c); - escaped.append("0000".toCharArray(), 0, 4 - hex.length()); - escaped.append(hex); - escaped.append("_"); - } else { - escaped.append(c); - } - } - schemaObjectPrefix = escaped.toString(); + BundleDbContext context = createContext(); + try { + DatabaseMetaData metaData = context.getConnection().getMetaData(); + String legalChars = metaData.getExtraNameCharacters(); + legalChars += "ABCDEFGHIJKLMNOPQRSTUVWXZY0123456789_"; + + String prefix = schemaObjectPrefix.toUpperCase(); + StringBuffer escaped = new StringBuffer(); + for (int i = 0; i < prefix.length(); i++) { + char c = prefix.charAt(i); + if (legalChars.indexOf(c) == -1) { + escaped.append("_x"); + String hex = Integer.toHexString(c); + escaped.append("0000".toCharArray(), 0, 4 - hex.length()); + escaped.append(hex); + escaped.append("_"); + } else { + escaped.append(c); + } + } + schemaObjectPrefix = escaped.toString(); + } finally { + context.closeSilently(); + } } /** @@ -965,8 +1031,7 @@ try { if (nameIndex instanceof DbNameIndex) { ((DbNameIndex) nameIndex).close(); - } - connectionManager.close(); + } // close blob store blobStore.close(); blobStore = null; @@ -1052,6 +1117,7 @@ public synchronized NodeIdIterator getAllNodeIds(NodeId bigger, int maxCount) throws ItemStateException, RepositoryException { ResultSet rs = null; + BundleDbContext context = createContext(); try { UUID lowUuid; Object[] keys; @@ -1072,7 +1138,7 @@ // see also bundleSelectAllIdsFrom SQL statement maxCount += 10; } - Statement stmt = connectionManager.executeStmt(sql, keys, false, maxCount); + Statement stmt = context.getConnectionHelper().executeStmt(sql, keys, false, maxCount); rs = stmt.getResultSet(); ArrayList result = new ArrayList(); while ((maxCount == 0 || result.size() < maxCount) && rs.next()) { @@ -1099,15 +1165,16 @@ throw new ItemStateException(msg, e); } finally { closeResultSet(rs); + context.closeSilently(); } } /** * {@inheritDoc} */ - protected synchronized NodePropBundle loadBundle(NodeId id) + protected synchronized NodePropBundle loadBundle(NodeId id, Context context) throws ItemStateException { - return loadBundle(id, false); + return loadBundle(id, false, (BundleDbContext)context); } /** @@ -1145,11 +1212,12 @@ * exist. * @throws ItemStateException if an error while loading occurs. */ - protected synchronized NodePropBundle loadBundle(NodeId id, boolean checkBeforeLoading) + protected synchronized NodePropBundle loadBundle(NodeId id, boolean checkBeforeLoading, BundleDbContext context) throws ItemStateException { ResultSet rs = null; + try { - Statement stmt = connectionManager.executeStmt(bundleSelectSQL, getKey(id.getUUID())); + Statement stmt = context.getConnectionHelper().executeStmt(bundleSelectSQL, getKey(id.getUUID())); rs = stmt.getResultSet(); if (!rs.next()) { return null; @@ -1183,10 +1251,11 @@ /** * {@inheritDoc} */ - protected synchronized boolean existsBundle(NodeId id) throws ItemStateException { + protected synchronized boolean existsBundle(NodeId id, Context context) throws ItemStateException { ResultSet rs = null; try { - Statement stmt = connectionManager.executeStmt(bundleSelectSQL, getKey(id.getUUID())); + ConnectionHelper helper = ((BundleDbContext)context).getConnectionHelper(); + Statement stmt = helper.executeStmt(bundleSelectSQL, getKey(id.getUUID())); rs = stmt.getResultSet(); // a bundle exists, if the result has at least one entry return rs.next(); @@ -1202,7 +1271,7 @@ /** * {@inheritDoc} */ - protected synchronized void storeBundle(NodePropBundle bundle) throws ItemStateException { + protected synchronized void storeBundle(NodePropBundle bundle, Context context) throws ItemStateException { try { ByteArrayOutputStream out = new ByteArrayOutputStream(INITIAL_BUFFER_SIZE); DataOutputStream dout = new DataOutputStream(out); @@ -1211,7 +1280,7 @@ String sql = bundle.isNew() ? bundleInsertSQL : bundleUpdateSQL; Object[] params = createParams(bundle.getId().getUUID(), out.toByteArray(), true); - connectionManager.executeStmt(sql, params); + ((BundleDbContext)context).getConnectionHelper().executeStmt(sql, params); } catch (Exception e) { String msg = "failed to write bundle: " + bundle.getId(); log.error(msg, e); @@ -1222,9 +1291,10 @@ /** * {@inheritDoc} */ - protected synchronized void destroyBundle(NodePropBundle bundle) throws ItemStateException { + protected synchronized void destroyBundle(NodePropBundle bundle, Context context) throws ItemStateException { try { - connectionManager.executeStmt(bundleDeleteSQL, getKey(bundle.getId().getUUID())); + ConnectionHelper helper = ((BundleDbContext)context).getConnectionHelper(); + helper.executeStmt(bundleDeleteSQL, getKey(bundle.getId().getUUID())); } catch (Exception e) { if (e instanceof NoSuchItemStateException) { throw (NoSuchItemStateException) e; @@ -1246,8 +1316,9 @@ ResultSet rs = null; InputStream in = null; + BundleDbContext context = createContext(); try { - Statement stmt = connectionManager.executeStmt( + Statement stmt = context.getConnectionHelper().executeStmt( nodeReferenceSelectSQL, getKey(targetId.getTargetId().getUUID())); rs = stmt.getResultSet(); if (!rs.next()) { @@ -1269,6 +1340,7 @@ } finally { IOUtils.closeQuietly(in); closeResultSet(rs); + context.closeSilently(); } } @@ -1281,7 +1353,7 @@ * shared statement. If the method would not be synchronized, the shared * statement must be synchronized. */ - public synchronized void store(NodeReferences refs) throws ItemStateException { + public synchronized void store(NodeReferences refs, Context context) throws ItemStateException { if (!initialized) { throw new IllegalStateException("not initialized"); } @@ -1297,7 +1369,7 @@ Serializer.serialize(refs, out); Object[] params = createParams(refs.getTargetId().getUUID(), out.toByteArray(), true); - connectionManager.executeStmt(sql, params); + ((BundleDbContext)context).getConnectionHelper().executeStmt(sql, params); // there's no need to close a ByteArrayOutputStream //out.close(); @@ -1311,13 +1383,14 @@ /** * {@inheritDoc} */ - public synchronized void destroy(NodeReferences refs) throws ItemStateException { + public synchronized void destroy(NodeReferences refs, Context context) throws ItemStateException { if (!initialized) { throw new IllegalStateException("not initialized"); } try { - connectionManager.executeStmt(nodeReferenceDeleteSQL, + ConnectionHelper helper = ((BundleDbContext)context).getConnectionHelper(); + helper.executeStmt(nodeReferenceDeleteSQL, getKey(refs.getTargetId().getUUID())); } catch (Exception e) { if (e instanceof NoSuchItemStateException) { @@ -1338,8 +1411,10 @@ } ResultSet rs = null; + BundleDbContext context = createContext(); try { - Statement stmt = connectionManager.executeStmt(nodeReferenceSelectSQL, + ConnectionHelper helper = context.getConnectionHelper(); + Statement stmt = helper.executeStmt(nodeReferenceSelectSQL, getKey(targetId.getTargetId().getUUID())); rs = stmt.getResultSet(); @@ -1352,6 +1427,7 @@ throw new ItemStateException(msg, e); } finally { closeResultSet(rs); + context.closeSilently(); } } @@ -1538,31 +1614,46 @@ * {@inheritDoc} */ public InputStream get(String blobId) throws Exception { - Statement stmt = connectionManager.executeStmt(blobSelectSQL, new Object[]{blobId}); - final ResultSet rs = stmt.getResultSet(); - if (!rs.next()) { - closeResultSet(rs); - throw new Exception("no such BLOB: " + blobId); - } - InputStream in = rs.getBinaryStream(1); - if (in == null) { - // some databases treat zero-length values as NULL; - // return empty InputStream in such a case - closeResultSet(rs); - return new ByteArrayInputStream(new byte[0]); - } - - /** - * return an InputStream wrapper in order to - * close the ResultSet when the stream is closed - */ - return new FilterInputStream(in) { - public void close() throws IOException { - in.close(); - // now it's safe to close ResultSet + final BundleDbContext context = createContext(); + try + { + Statement stmt = context.getConnectionHelper().executeStmt(blobSelectSQL, new Object[]{blobId}); + + final ResultSet rs = stmt.getResultSet(); + if (!rs.next()) { + closeResultSet(rs); + throw new Exception("no such BLOB: " + blobId); + } + + InputStream in = rs.getBinaryStream(1); + if (in == null) { + // some databases treat zero-length values as NULL; + // return empty InputStream in such a case closeResultSet(rs); + context.closeSilently(); + return new ByteArrayInputStream(new byte[0]); } - }; + + /** + * return an InputStream wrapper in order to + * close the ResultSet when the stream is closed + */ + return new FilterInputStream(in) { + public void close() throws IOException { + try { + in.close(); + // now it's safe to close ResultSet + closeResultSet(rs); + } finally { + context.closeSilently(); + } + } + }; + + } catch (Exception e) { + context.closeSilently(); + throw (e); + } } /** @@ -1570,23 +1661,33 @@ */ public synchronized void put(String blobId, InputStream in, long size) throws Exception { - Statement stmt = connectionManager.executeStmt(blobSelectExistSQL, new Object[]{blobId}); - ResultSet rs = stmt.getResultSet(); - // a BLOB exists if the result has at least one entry - boolean exists = rs.next(); - closeResultSet(rs); - - String sql = (exists) ? blobUpdateSQL : blobInsertSQL; - Object[] params = new Object[]{new ConnectionRecoveryManager.StreamWrapper(in, size), blobId}; - connectionManager.executeStmt(sql, params); + BundleDbContext context = createContext(); + try { + Statement stmt = context.getConnectionHelper().executeStmt(blobSelectExistSQL, new Object[]{blobId}); + ResultSet rs = stmt.getResultSet(); + // a BLOB exists if the result has at least one entry + boolean exists = rs.next(); + closeResultSet(rs); + + String sql = (exists) ? blobUpdateSQL : blobInsertSQL; + Object[] params = new Object[]{new StreamWrapper(in, size), blobId}; + context.getConnectionHelper().executeStmt(sql, params); + } finally { + context.closeSilently(); + } } /** * {@inheritDoc} */ public synchronized boolean remove(String blobId) throws Exception { - Statement stmt = connectionManager.executeStmt(blobDeleteSQL, new Object[]{blobId}); - return stmt.getUpdateCount() == 1; + BundleDbContext context = createContext(); + try { + Statement stmt = context.getConnectionHelper().executeStmt(blobDeleteSQL, new Object[]{blobId}); + return stmt.getUpdateCount() == 1; + } finally { + context.closeSilently(); + } } public void close() { Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/H2PersistenceManager.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/persistence/bundle/H2PersistenceManager.java (revision 778491) +++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/H2PersistenceManager.java (working copy) @@ -16,13 +16,15 @@ */ package org.apache.jackrabbit.core.persistence.bundle; -import org.apache.jackrabbit.core.persistence.PMContext; - +import java.sql.Connection; +import java.sql.SQLException; import java.sql.Statement; -import java.sql.SQLException; import javax.jcr.RepositoryException; +import org.apache.jackrabbit.core.persistence.PMContext; +import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionHelper; + /** * Extends the {@link BundleDbPersistenceManager} by H2 specific code. *

@@ -95,11 +97,16 @@ * {@inheritDoc} */ protected void checkSchema() throws SQLException, RepositoryException { - Statement stmt = connectionManager.getConnection().createStatement(); + Connection connection = null; + Statement stmt = null; + try { + connection = dataSource.getConnection(); + stmt = connection.createStatement(); stmt.execute("SET LOCK_TIMEOUT " + lockTimeout); } finally { - stmt.close(); + ConnectionHelper.closeSilently(stmt); + ConnectionHelper.closeSilently(connection); } super.checkSchema(); } Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/AbstractBundlePersistenceManager.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/persistence/bundle/AbstractBundlePersistenceManager.java (revision 778491) +++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/AbstractBundlePersistenceManager.java (working copy) @@ -16,42 +16,42 @@ */ package org.apache.jackrabbit.core.persistence.bundle; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.jackrabbit.core.fs.FileSystemResource; -import org.apache.jackrabbit.core.fs.FileSystem; -import org.apache.jackrabbit.core.state.ItemState; -import org.apache.jackrabbit.core.state.ChangeLog; -import org.apache.jackrabbit.core.state.ItemStateException; -import org.apache.jackrabbit.core.state.NodeReferences; -import org.apache.jackrabbit.core.state.NoSuchItemStateException; -import org.apache.jackrabbit.core.state.NodeReferencesId; -import org.apache.jackrabbit.core.state.PropertyState; -import org.apache.jackrabbit.core.state.NodeState; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; + +import javax.jcr.PropertyType; + import org.apache.jackrabbit.core.ItemId; +import org.apache.jackrabbit.core.NamespaceRegistryImpl; import org.apache.jackrabbit.core.NodeId; import org.apache.jackrabbit.core.PropertyId; -import org.apache.jackrabbit.core.NamespaceRegistryImpl; +import org.apache.jackrabbit.core.fs.FileSystem; +import org.apache.jackrabbit.core.fs.FileSystemResource; import org.apache.jackrabbit.core.nodetype.PropDefId; -import org.apache.jackrabbit.core.value.InternalValue; import org.apache.jackrabbit.core.persistence.IterablePersistenceManager; import org.apache.jackrabbit.core.persistence.PMContext; import org.apache.jackrabbit.core.persistence.PersistenceManager; -import org.apache.jackrabbit.core.util.StringIndex; -import org.apache.jackrabbit.core.persistence.bundle.util.NodePropBundle; +import org.apache.jackrabbit.core.persistence.bundle.util.BundleBinding; import org.apache.jackrabbit.core.persistence.bundle.util.BundleCache; +import org.apache.jackrabbit.core.persistence.bundle.util.HashMapIndex; import org.apache.jackrabbit.core.persistence.bundle.util.LRUNodeIdCache; -import org.apache.jackrabbit.core.persistence.bundle.util.HashMapIndex; -import org.apache.jackrabbit.core.persistence.bundle.util.BundleBinding; +import org.apache.jackrabbit.core.persistence.bundle.util.NodePropBundle; +import org.apache.jackrabbit.core.state.ChangeLog; +import org.apache.jackrabbit.core.state.ItemState; +import org.apache.jackrabbit.core.state.ItemStateException; +import org.apache.jackrabbit.core.state.NoSuchItemStateException; +import org.apache.jackrabbit.core.state.NodeReferences; +import org.apache.jackrabbit.core.state.NodeReferencesId; +import org.apache.jackrabbit.core.state.NodeState; +import org.apache.jackrabbit.core.state.PropertyState; +import org.apache.jackrabbit.core.util.StringIndex; +import org.apache.jackrabbit.core.value.InternalValue; import org.apache.jackrabbit.spi.Name; import org.apache.jackrabbit.spi.commons.name.NameConstants; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Set; - -import javax.jcr.PropertyType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * The AbstractBundlePersistenceManager acts as base for all @@ -145,6 +145,27 @@ } /** + * Holds "transactional" information for each call. This is used ensure that nested calls use + * same database connection as the caller methods. + * + */ + protected abstract class Context + { + /** + * Method that creates context must call this to clean up. + */ + abstract void closeSilently(); + }; + + /** + * Creates new context object. Subclass can use this to propagate "transactional" information to + * nested methods. + * + * @return new context instance + */ + protected abstract Context createContext(); + + /** * Creates the folder path for the given node id that is suitable for * storing states in a filesystem. * @@ -336,7 +357,7 @@ * exist. * @throws ItemStateException if an error while loading occurs. */ - protected abstract NodePropBundle loadBundle(NodeId id) + protected abstract NodePropBundle loadBundle(NodeId id, Context context) throws ItemStateException; /** @@ -347,7 +368,7 @@ * false otherwise. * @throws ItemStateException if an error while checking occurs. */ - protected abstract boolean existsBundle(NodeId id) + protected abstract boolean existsBundle(NodeId id, Context context) throws ItemStateException; /** @@ -356,7 +377,7 @@ * @param bundle the bundle to store * @throws ItemStateException if an error while storing occurs. */ - protected abstract void storeBundle(NodePropBundle bundle) + protected abstract void storeBundle(NodePropBundle bundle, Context context) throws ItemStateException; /** @@ -366,7 +387,7 @@ * * @throws ItemStateException if an error while destroying occurs. */ - protected abstract void destroyBundle(NodePropBundle bundle) + protected abstract void destroyBundle(NodePropBundle bundle, Context context) throws ItemStateException; /** @@ -381,7 +402,7 @@ * @param refs the node references to destroy. * @throws ItemStateException if an error while destroying occurs. */ - protected abstract void destroy(NodeReferences refs) + protected abstract void destroy(NodeReferences refs, Context context) throws ItemStateException; /** @@ -390,7 +411,7 @@ * @param refs the node references to store. * @throws ItemStateException if an error while storing occurs. */ - protected abstract void store(NodeReferences refs) + protected abstract void store(NodeReferences refs, Context context) throws ItemStateException; /** @@ -443,11 +464,16 @@ */ public synchronized NodeState load(NodeId id) throws NoSuchItemStateException, ItemStateException { - NodePropBundle bundle = getBundle(id); - if (bundle == null) { - throw new NoSuchItemStateException(id.toString()); - } - return bundle.createNodeState(this); + Context context = createContext(); + try { + NodePropBundle bundle = getBundle(id, context); + if (bundle == null) { + throw new NoSuchItemStateException(id.toString()); + } + return bundle.createNodeState(this); + } finally { + context.closeSilently(); + } } /** @@ -457,38 +483,43 @@ */ public synchronized PropertyState load(PropertyId id) throws NoSuchItemStateException, ItemStateException { - NodePropBundle bundle = getBundle(id.getParentId()); - if (bundle == null) { - throw new NoSuchItemStateException(id.toString()); - } - PropertyState state = bundle.createPropertyState(this, id.getName()); - if (state == null) { - // check if autocreated property state - if (id.getName().equals(NameConstants.JCR_UUID)) { - state = createNew(id); - state.setType(PropertyType.STRING); - state.setDefinitionId(idJcrUUID); - state.setMultiValued(false); - state.setValues(new InternalValue[]{InternalValue.create(id.getParentId().getUUID().toString())}); - } else if (id.getName().equals(NameConstants.JCR_PRIMARYTYPE)) { - state = createNew(id); - state.setType(PropertyType.NAME); - state.setDefinitionId(idJcrPrimaryType); - state.setMultiValued(false); - state.setValues(new InternalValue[]{InternalValue.create(bundle.getNodeTypeName())}); - } else if (id.getName().equals(NameConstants.JCR_MIXINTYPES)) { - Set mixins = bundle.getMixinTypeNames(); - state = createNew(id); - state.setType(PropertyType.NAME); - state.setDefinitionId(idJcrMixinTypes); - state.setMultiValued(true); - state.setValues(InternalValue.create((Name[]) mixins.toArray(new Name[mixins.size()]))); - } else { - throw new NoSuchItemStateException(id.toString()); - } - bundle.addProperty(state); - } - return state; + Context context = createContext(); + try { + NodePropBundle bundle = getBundle(id.getParentId(), context); + if (bundle == null) { + throw new NoSuchItemStateException(id.toString()); + } + PropertyState state = bundle.createPropertyState(this, id.getName()); + if (state == null) { + // check if autocreated property state + if (id.getName().equals(NameConstants.JCR_UUID)) { + state = createNew(id); + state.setType(PropertyType.STRING); + state.setDefinitionId(idJcrUUID); + state.setMultiValued(false); + state.setValues(new InternalValue[]{InternalValue.create(id.getParentId().getUUID().toString())}); + } else if (id.getName().equals(NameConstants.JCR_PRIMARYTYPE)) { + state = createNew(id); + state.setType(PropertyType.NAME); + state.setDefinitionId(idJcrPrimaryType); + state.setMultiValued(false); + state.setValues(new InternalValue[]{InternalValue.create(bundle.getNodeTypeName())}); + } else if (id.getName().equals(NameConstants.JCR_MIXINTYPES)) { + Set mixins = bundle.getMixinTypeNames(); + state = createNew(id); + state.setType(PropertyType.NAME); + state.setDefinitionId(idJcrMixinTypes); + state.setMultiValued(true); + state.setValues(InternalValue.create((Name[]) mixins.toArray(new Name[mixins.size()]))); + } else { + throw new NoSuchItemStateException(id.toString()); + } + bundle.addProperty(state); + } + return state; + } finally { + context.closeSilently(); + } } /** @@ -497,8 +528,13 @@ * Loads the state via the appropriate NodePropBundle. */ public synchronized boolean exists(PropertyId id) throws ItemStateException { - NodePropBundle bundle = getBundle(id.getParentId()); - return bundle != null && bundle.hasProperty(id.getName()); + Context context = createContext(); + try { + NodePropBundle bundle = getBundle(id.getParentId(), context); + return bundle != null && bundle.hasProperty(id.getName()); + } finally { + context.closeSilently(); + } } /** @@ -507,8 +543,13 @@ * Checks the existence via the appropriate NodePropBundle. */ public synchronized boolean exists(NodeId id) throws ItemStateException { + Context context = createContext(); + try { // anticipating a load followed by a exists - return getBundle(id) != null; + return getBundle(id, context) != null; + } finally { + context.closeSilently(); + } } /** @@ -534,11 +575,11 @@ * * {@inheritDoc} */ - public synchronized void store(ChangeLog changeLog) + public synchronized void store(ChangeLog changeLog, Context context) throws ItemStateException { boolean success = false; try { - storeInternal(changeLog); + storeInternal(changeLog, context); success = true; } finally { if (!success) { @@ -547,6 +588,20 @@ } } } + + /** + * {@inheritDoc} + */ + public final synchronized void store(ChangeLog changeLog) + throws ItemStateException + { + Context context = createContext(); + try { + store(changeLog, context); + } finally { + context.closeSilently(); + } + } /** * Stores the given changelog and updates the bundle cache. @@ -554,7 +609,7 @@ * @param changeLog the changelog to store * @throws ItemStateException on failure */ - private void storeInternal(ChangeLog changeLog) + private void storeInternal(ChangeLog changeLog, Context context) throws ItemStateException { // delete bundles HashSet deleted = new HashSet(); @@ -562,11 +617,11 @@ while (iter.hasNext()) { ItemState state = iter.next(); if (state.isNode()) { - NodePropBundle bundle = getBundle((NodeId) state.getId()); + NodePropBundle bundle = getBundle((NodeId) state.getId(), context); if (bundle == null) { throw new NoSuchItemStateException(state.getId().toString()); } - deleteBundle(bundle); + deleteBundle(bundle, context); deleted.add(state.getId()); } } @@ -588,7 +643,7 @@ NodeId nodeId = (NodeId) state.getId(); NodePropBundle bundle = (NodePropBundle) modified.get(nodeId); if (bundle == null) { - bundle = getBundle(nodeId); + bundle = getBundle(nodeId, context); if (bundle == null) { throw new NoSuchItemStateException(nodeId.toString()); } @@ -606,7 +661,7 @@ NodeId nodeId = id.getParentId(); NodePropBundle bundle = (NodePropBundle) modified.get(nodeId); if (bundle == null) { - bundle = getBundle(nodeId); + bundle = getBundle(nodeId, context); if (bundle == null) { throw new NoSuchItemStateException(nodeId.toString()); } @@ -633,7 +688,7 @@ if (bundle == null) { // should actually not happen log.warn("deleted property state's parent not modified!"); - bundle = getBundle(nodeId); + bundle = getBundle(nodeId, context); if (bundle == null) { throw new NoSuchItemStateException(nodeId.toString()); } @@ -660,7 +715,7 @@ if (bundle == null) { // should actually not happen log.warn("added property state's parent not modified!"); - bundle = getBundle(nodeId); + bundle = getBundle(nodeId, context); if (bundle == null) { throw new NoSuchItemStateException(nodeId.toString()); } @@ -672,7 +727,7 @@ // now store all modified bundles for (NodePropBundle bundle : modified.values()) { - putBundle(bundle); + putBundle(bundle, context); } // store the refs @@ -680,9 +735,9 @@ while (itMod.hasNext()) { NodeReferences refs = itMod.next(); if (refs.hasReferences()) { - store(refs); + store(refs, context); } else { - destroy(refs); + destroy(refs, context); } } } @@ -695,13 +750,13 @@ * * @throws ItemStateException if an error occurs. */ - private NodePropBundle getBundle(NodeId id) throws ItemStateException { + private NodePropBundle getBundle(NodeId id, Context context) throws ItemStateException { if (missing.contains(id)) { return null; } NodePropBundle bundle = bundles.get(id); if (bundle == null) { - bundle = loadBundle(id); + bundle = loadBundle(id, context); if (bundle != null) { bundle.markOld(); bundles.put(bundle); @@ -718,8 +773,8 @@ * @param bundle the bundle to delete * @throws ItemStateException if an error occurs */ - private void deleteBundle(NodePropBundle bundle) throws ItemStateException { - destroyBundle(bundle); + private void deleteBundle(NodePropBundle bundle, Context context) throws ItemStateException { + destroyBundle(bundle, context); bundle.removeAllProperties(); bundles.remove(bundle.getId()); missing.put(bundle.getId()); @@ -731,8 +786,8 @@ * @param bundle the bundle to store * @throws ItemStateException if an error occurs */ - private void putBundle(NodePropBundle bundle) throws ItemStateException { - storeBundle(bundle); + private void putBundle(NodePropBundle bundle, Context context) throws ItemStateException { + storeBundle(bundle, context); bundle.markOld(); log.debug("stored bundle " + bundle.getId()); Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/StreamWrapper.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/StreamWrapper.java (revision 0) +++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/StreamWrapper.java (revision 0) @@ -0,0 +1,30 @@ +package org.apache.jackrabbit.core.persistence.bundle.util; + +import java.io.InputStream; + +public class StreamWrapper { + + private final InputStream stream; + private final long size; + + /** + * Creates a wrapper for the given InputStream that can + * safely be passed as a parameter to the executeStmt + * methods in the {@link ConnectionRecoveryManager} class. + * + * @param in the InputStream to wrap + * @param size the size of the input stream + */ + public StreamWrapper(InputStream in, long size) { + this.stream = in; + this.size = size; + } + + public InputStream getStream() { + return stream; + } + + public long getSize() { + return size; + } +} Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/PostgreSQLNameIndex.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/PostgreSQLNameIndex.java (revision 778491) +++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/PostgreSQLNameIndex.java (working copy) @@ -16,9 +16,12 @@ */ package org.apache.jackrabbit.core.persistence.bundle.util; +import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; +import javax.sql.DataSource; + /** * Same as {@link DbNameIndex} but does not make use of the * {@link java.sql.Statement#RETURN_GENERATED_KEYS} feature as it is not @@ -28,9 +31,9 @@ protected String generatedKeySelectSQL; - public PostgreSQLNameIndex(ConnectionRecoveryManager conMgr, String schemaObjectPrefix) + public PostgreSQLNameIndex(DataSource dataSource, String schemaObjectPrefix) throws SQLException { - super(conMgr, schemaObjectPrefix); + super(dataSource, schemaObjectPrefix); } /** @@ -59,13 +62,17 @@ */ protected int insertString(String string) { // assert index does not exist + Connection connection = null; try { - connectionManager.executeStmt(nameInsertSQL, new Object[]{string}); + connection = dataSource.getConnection(); + new ConnectionHelper(connection).executeStmt(nameInsertSQL, new Object[]{string}); return getGeneratedKey(); - } catch (Exception e) { + } catch (Exception e) { IllegalStateException ise = new IllegalStateException("Unable to insert index for string: " + string); ise.initCause(e); throw ise; + } finally { + ConnectionHelper.closeSilently(connection); } } @@ -74,8 +81,10 @@ * @return the index. */ protected int getGeneratedKey() { + Connection connection = null; try { - ResultSet rs = connectionManager.executeQuery(generatedKeySelectSQL); + connection = dataSource.getConnection(); + ResultSet rs = new ConnectionHelper(connection).executeQuery(generatedKeySelectSQL); try { if (!rs.next()) { return -1; @@ -89,6 +98,8 @@ IllegalStateException ise = new IllegalStateException("Unable to read generated index"); ise.initCause(e); throw ise; + } finally { + ConnectionHelper.closeSilently(connection); } } Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/ConnectionHelper.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/ConnectionHelper.java (revision 0) +++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/ConnectionHelper.java (revision 0) @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.core.persistence.bundle.util; + +import java.io.InputStream; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; + +import javax.jcr.RepositoryException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class provides convenience methods to execute SQL statements. + */ +public class ConnectionHelper { + + private final Connection connection; + + /** + * Creates a new {@link ConnectionHelper} instance + * + * @param connection + */ + public ConnectionHelper(Connection connection) + { + this.connection = connection; + } + + /** + * Executes the given SQL query. Retries once or blocks (when the + * block parameter has been set to true on construction) + * if this fails and autoReconnect is enabled. + * + * @param sql the SQL query to execute + * @return the executed ResultSet + * @throws SQLException on error + * @throws RepositoryException if the database driver could not be loaded + */ + public synchronized ResultSet executeQuery(String sql) throws SQLException, RepositoryException { + return executeQueryInternal(sql); + } + + /** + * Executes the given SQL query. + * + * @param sql query to execute + * @return a ResultSet object + * @throws SQLException if an error occurs + * @throws RepositoryException if the database driver could not be loaded + */ + private ResultSet executeQueryInternal(String sql) throws SQLException, RepositoryException { + PreparedStatement stmt = null; + try { + stmt = connection.prepareStatement(sql); + return stmt.executeQuery(); + } catch (SQLException e) { + logException("could not execute statement", e); + throw e; + } finally { + resetStatement(stmt); + } + } + + /** + * Resets the given PreparedStatement by clearing the + * parameters and warnings contained. + * + * @param stmt The PreparedStatement to reset. If + * null this method does nothing. + */ + private void resetStatement(PreparedStatement stmt) { + if (stmt != null) { + try { + stmt.clearParameters(); + stmt.clearWarnings(); + } catch (SQLException se) { + logException("Failed resetting PreparedStatement", se); + } + } + } + + /** + * Executes the given SQL statement with the specified parameters. + * + * @param sql statement to execute + * @param params parameters to set + * @return the Statement object that had been executed + * @throws SQLException if an error occurs + * @throws RepositoryException if the database driver could not be loaded + */ + public PreparedStatement executeStmt(String sql, Object[] params) + throws SQLException, RepositoryException { + return executeStmt(sql, params, false, 0); + } + + /** + * Executes the given SQL statement with the specified parameters. + * + * @param sql statement to execute + * @param params parameters to set + * @param returnGeneratedKeys if the statement should return auto generated keys + * @param maxRows the maximum number of rows to return (0 for all rows) + * @return the Statement object that had been executed + * @throws SQLException if an error occurs + * @throws RepositoryException if the database driver could not be loaded + */ + public synchronized PreparedStatement executeStmt( + String sql, Object[] params, boolean returnGeneratedKeys, int maxRows) + throws SQLException, RepositoryException { + return executeStmtInternal(sql, params, returnGeneratedKeys, maxRows); + } + + /** + * Executes the given SQL statement with the specified parameters. + * + * @param sql statement to execute + * @param params parameters to set + * @param returnGeneratedKeys if the statement should return auto generated keys + * @param maxRows the maximum number of rows to return (0 for all rows) + * @return the Statement object that had been executed + * @throws SQLException if an error occurs + * @throws RepositoryException if the database driver could not be loaded + */ + private PreparedStatement executeStmtInternal( + String sql, Object[] params, boolean returnGeneratedKeys, int maxRows) + throws SQLException, RepositoryException { + try { + PreparedStatement stmt; + + if (returnGeneratedKeys) { + stmt = connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS); + } else { + stmt = connection.prepareStatement(sql); + } + + stmt.setMaxRows(maxRows); + return executeStmtInternal(params, stmt); + } catch (SQLException e) { + logException("could not execute statement", e); + throw e; + } + } + + /** + * @param params the parameters for the stmt parameter + * @param stmt the statement to execute + * @return the executed Statement + * @throws SQLException on error + */ + private PreparedStatement executeStmtInternal(Object[] params, PreparedStatement stmt) + throws SQLException { + for (int i = 0; params != null && i < params.length; i++) { + Object p = params[i]; + if (p instanceof StreamWrapper) { + StreamWrapper wrapper = (StreamWrapper) p; + stmt.setBinaryStream(i + 1, wrapper.getStream(), (int) wrapper.getSize()); + } else if (p instanceof InputStream) { + InputStream stream = (InputStream) p; + stmt.setBinaryStream(i + 1, stream, -1); + } else { + stmt.setObject(i + 1, p); + } + } + stmt.execute(); + resetStatement(stmt); + return stmt; + } + + /** + * Logs an sql exception. + * + * @param message the message + * @param se the exception + */ + private void logException(String message, SQLException se) { + message = message == null ? "" : message; + log.error(message + ", reason: " + se.getMessage() + ", state/code: " + + se.getSQLState() + "/" + se.getErrorCode()); + log.debug(" dump:", se); + } + + public static void closeSilently(Statement statement) + { + if (statement != null) + { + try + { + statement.close(); + } + catch (SQLException e) + { + log.error("Error closing statement", e); + } + } + } + + public static void closeSilently(Connection connection) + { + if (connection != null) + { + try + { + connection.close(); + } + catch (SQLException e) + { + log.error("Error closing connection", e); + } + } + } + + private static final Logger log = LoggerFactory.getLogger(ConnectionHelper.class); +} Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/ConnectionFactory.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/ConnectionFactory.java (revision 778788) +++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/ConnectionFactory.java (working copy) @@ -74,6 +74,30 @@ return database.getConnection(user, password); } } + + @SuppressWarnings("unchecked") + public static DataSource getDataSource(String driver, String url, String user, String password) + throws RepositoryException, SQLException + { + DataSource database; + + Class driverClass = getDriverClass(driver); + if (driverClass != null + && Context.class.isAssignableFrom(driverClass)) { + database = getJndiDataSource((Class) driverClass, url); + } else { + database = getDriverDataSource(driverClass, url); + } + + if (user == null && password == null) + { + return database; + } + else + { + return new DataSourceWrapper(database, user, password); + } + } /** * Loads and returns the given JDBC driver (or JNDI context) class. Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/NGKDbNameIndex.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/NGKDbNameIndex.java (revision 778491) +++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/NGKDbNameIndex.java (working copy) @@ -16,7 +16,11 @@ */ package org.apache.jackrabbit.core.persistence.bundle.util; +import java.sql.Connection; import java.sql.SQLException; +import java.sql.Statement; + +import javax.sql.DataSource; /** * Same as {@link DbNameIndex} but does not make use of the @@ -31,9 +35,9 @@ * @param schemaObjectPrefix the prefix for table names * @throws SQLException if the statements cannot be prepared. */ - public NGKDbNameIndex(ConnectionRecoveryManager conMgr, String schemaObjectPrefix) + public NGKDbNameIndex(DataSource dataSource, String schemaObjectPrefix) throws SQLException { - super(conMgr, schemaObjectPrefix); + super(dataSource, schemaObjectPrefix); } /** @@ -57,13 +61,17 @@ */ protected int insertString(String string) { // assert index does not exist + Connection connection = null; try { - connectionManager.executeStmt(nameInsertSQL, new Object[] { string }); + connection = dataSource.getConnection(); + new ConnectionHelper(connection).executeStmt(nameInsertSQL, new Object[] { string }); } catch (Exception e) { IllegalStateException ise = new IllegalStateException( "Unable to insert index for string: " + string); ise.initCause(e); throw ise; + } finally { + ConnectionHelper.closeSilently(connection); } return getIndex(string); Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/DbNameIndex.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/DbNameIndex.java (revision 778491) +++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/DbNameIndex.java (working copy) @@ -16,11 +16,13 @@ */ package org.apache.jackrabbit.core.persistence.bundle.util; -import java.util.HashMap; - +import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.util.HashMap; + +import javax.sql.DataSource; import org.apache.jackrabbit.core.util.StringIndex; @@ -40,7 +42,7 @@ /** * The class that manages statement execution and recovery from connection loss. */ - protected ConnectionRecoveryManager connectionManager; + protected DataSource dataSource; // name index statements protected String nameSelectSQL; @@ -57,9 +59,9 @@ * @param schemaObjectPrefix the prefix for table names * @throws SQLException if the statements cannot be prepared. */ - public DbNameIndex(ConnectionRecoveryManager conMgr, String schemaObjectPrefix) + public DbNameIndex(DataSource dataSource, String schemaObjectPrefix) throws SQLException { - connectionManager = conMgr; + this.dataSource = dataSource; init(schemaObjectPrefix); } @@ -133,8 +135,10 @@ protected int insertString(String string) { // assert index does not exist int result = -1; + Connection connection = null; try { - Statement stmt = connectionManager.executeStmt( + connection = dataSource.getConnection(); + Statement stmt = new ConnectionHelper(connection).executeStmt( nameInsertSQL, new Object[] { string }, true, 0); ResultSet rs = stmt.getGeneratedKeys(); try { @@ -149,6 +153,8 @@ "Unable to insert index for string: " + string); ise.initCause(e); throw ise; + } finally { + ConnectionHelper.closeSilently(connection); } if (result != -1) { return result; @@ -164,8 +170,10 @@ * @return the index or -1 if not found. */ protected int getIndex(String string) { + Connection connection = null; try { - Statement stmt = connectionManager.executeStmt( + connection = dataSource.getConnection(); + Statement stmt = new ConnectionHelper(connection).executeStmt( indexSelectSQL, new Object[] { string }); ResultSet rs = stmt.getResultSet(); try { @@ -182,6 +190,8 @@ "Unable to read index for string: " + string); ise.initCause(e); throw ise; + } finally { + ConnectionHelper.closeSilently(connection); } } @@ -194,8 +204,10 @@ protected String getString(int index) throws IllegalArgumentException, IllegalStateException { String result = null; + Connection connection = null; try { - Statement stmt = connectionManager.executeStmt( + connection = dataSource.getConnection(); + Statement stmt = new ConnectionHelper(connection).executeStmt( nameSelectSQL, new Object[] { Integer.valueOf(index) }); ResultSet rs = stmt.getResultSet(); try { @@ -210,6 +222,8 @@ "Unable to read name for index: " + index); ise.initCause(e); throw ise; + } finally { + ConnectionHelper.closeSilently(connection); } if (result == null) { throw new IllegalArgumentException("Index not found: " + index); Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/DataSourceWrapper.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/DataSourceWrapper.java (revision 0) +++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/DataSourceWrapper.java (revision 0) @@ -0,0 +1,46 @@ +package org.apache.jackrabbit.core.persistence.bundle.util; + +import java.io.PrintWriter; +import java.sql.Connection; +import java.sql.SQLException; + +import javax.sql.DataSource; + +public class DataSourceWrapper implements DataSource { + + private final DataSource dataSource; + private final String username; + private final String password; + + public DataSourceWrapper(DataSource dataSource, String username, String password) { + this.dataSource = dataSource; + this.username = username; + this.password = password; + } + + public Connection getConnection() throws SQLException { + return dataSource.getConnection(username, password); + } + + public Connection getConnection(String username, String password) + throws SQLException { + return dataSource.getConnection(username, password); + } + + public PrintWriter getLogWriter() throws SQLException { + return dataSource.getLogWriter(); + } + + public int getLoginTimeout() throws SQLException { + return dataSource.getLoginTimeout(); + } + + public void setLogWriter(PrintWriter out) throws SQLException { + dataSource.setLogWriter(out); + } + + public void setLoginTimeout(int seconds) throws SQLException { + dataSource.setLoginTimeout(seconds); + } + +} Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/Oracle9PersistenceManager.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/persistence/bundle/Oracle9PersistenceManager.java (revision 778491) +++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/Oracle9PersistenceManager.java (working copy) @@ -20,6 +20,7 @@ import org.slf4j.LoggerFactory; import org.apache.commons.io.IOUtils; import org.apache.jackrabbit.core.persistence.PMContext; +import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionHelper; import org.apache.jackrabbit.core.persistence.bundle.util.NodePropBundle; import org.apache.jackrabbit.core.persistence.util.Serializer; import org.apache.jackrabbit.core.state.ItemStateException; @@ -88,7 +89,14 @@ // use the Connection object for using the exact same // class loader that the Oracle driver was loaded with - blobClass = connectionManager.getConnection().getClass().getClassLoader().loadClass("oracle.sql.BLOB"); + Connection connection = null; + try { + connection = dataSource.getConnection(); + blobClass = connection.getClass().getClassLoader().loadClass("oracle.sql.BLOB"); + } finally { + ConnectionHelper.closeSilently(connection); + } + duractionSessionConstant = new Integer(blobClass.getField("DURATION_SESSION").getInt(null)); modeReadWriteConstant = @@ -105,9 +113,9 @@ /** * @inheritDoc */ - protected synchronized void storeBundle(NodePropBundle bundle) + protected synchronized void storeBundle(NodePropBundle bundle, Context context) throws ItemStateException { - Blob blob = null; + Blob blob = null; try { ByteArrayOutputStream out = new ByteArrayOutputStream(INITIAL_BUFFER_SIZE); DataOutputStream dout = new DataOutputStream(out); @@ -115,9 +123,9 @@ dout.close(); String sql = bundle.isNew() ? bundleInsertSQL : bundleUpdateSQL; - blob = createTemporaryBlob(new ByteArrayInputStream(out.toByteArray())); + blob = createTemporaryBlob(new ByteArrayInputStream(out.toByteArray()), (BundleDbContext)context); Object[] params = createParams(bundle.getId().getUUID(), blob, true); - connectionManager.executeStmt(sql, params); + ((BundleDbContext)context).getConnectionHelper().executeStmt(sql, params); } catch (Exception e) { String msg = "failed to write bundle: " + bundle.getId(); log.error(msg, e); @@ -135,7 +143,7 @@ /** * @inheritDoc */ - public synchronized void store(NodeReferences refs) + public synchronized void store(NodeReferences refs, Context context) throws ItemStateException { if (!initialized) { throw new IllegalStateException("not initialized"); @@ -154,9 +162,9 @@ // we are synchronized on this instance, therefore we do not // not have to additionally synchronize on the preparedStatement - blob = createTemporaryBlob(new ByteArrayInputStream(out.toByteArray())); + blob = createTemporaryBlob(new ByteArrayInputStream(out.toByteArray()), (BundleDbContext) context); Object[] params = createParams(refs.getTargetId().getUUID(), blob, true); - connectionManager.executeStmt(sql, params); + ((BundleDbContext)context).getConnectionHelper().executeStmt(sql, params); // there's no need to close a ByteArrayOutputStream //out.close(); @@ -179,7 +187,7 @@ * Creates a temporary oracle.sql.BLOB instance via reflection and spools * the contents of the specified stream. */ - protected Blob createTemporaryBlob(InputStream in) throws Exception { + protected Blob createTemporaryBlob(InputStream in, BundleDbContext context) throws Exception { /* BLOB blob = BLOB.createTemporary(con, false, BLOB.DURATION_SESSION); blob.open(BLOB.MODE_READWRITE); @@ -190,10 +198,11 @@ blob.close(); return blob; */ + Method createTemporary = blobClass.getMethod("createTemporary", new Class[]{Connection.class, Boolean.TYPE, Integer.TYPE}); Object blob = createTemporary.invoke(null, - new Object[]{connectionManager.getConnection(), Boolean.FALSE, duractionSessionConstant}); + new Object[]{context.getConnection(), Boolean.FALSE, duractionSessionConstant}); Method open = blobClass.getMethod("open", new Class[]{Integer.TYPE}); open.invoke(blob, new Object[]{modeReadWriteConstant}); Method getBinaryOutputStream = blobClass.getMethod("getBinaryOutputStream", new Class[0]); @@ -236,16 +245,17 @@ throws Exception { Blob blob = null; + BundleDbContext context = createContext(); try { - Statement stmt = connectionManager.executeStmt(blobSelectExistSQL, new Object[]{blobId}); + Statement stmt = context.getConnectionHelper().executeStmt(blobSelectExistSQL, new Object[]{blobId}); ResultSet rs = stmt.getResultSet(); // a BLOB exists if the result has at least one entry boolean exists = rs.next(); closeResultSet(rs); String sql = (exists) ? blobUpdateSQL : blobInsertSQL; - blob = createTemporaryBlob(in); - connectionManager.executeStmt(sql, new Object[]{blob, blobId}); + blob = createTemporaryBlob(in, context); + context.getConnectionHelper().executeStmt(sql, new Object[]{blob, blobId}); } finally { if (blob != null) { try { @@ -253,6 +263,7 @@ } catch (Exception e) { } } + context.closeSilently(); } } }