Index: src/main/java/org/apache/jackrabbit/core/config/RepositoryConfig.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/config/RepositoryConfig.java (revision 676201) +++ src/main/java/org/apache/jackrabbit/core/config/RepositoryConfig.java (working copy) @@ -16,22 +16,6 @@ */ package org.apache.jackrabbit.core.config; -import org.apache.commons.io.IOUtils; -import org.apache.jackrabbit.core.fs.FileSystem; -import org.apache.jackrabbit.core.fs.FileSystemException; -import org.apache.jackrabbit.core.fs.FileSystemPathUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.w3c.dom.Element; -import org.xml.sax.InputSource; - -import javax.xml.transform.OutputKeys; -import javax.xml.transform.Transformer; -import javax.xml.transform.TransformerConfigurationException; -import javax.xml.transform.TransformerException; -import javax.xml.transform.TransformerFactory; -import javax.xml.transform.dom.DOMSource; -import javax.xml.transform.stream.StreamResult; import java.io.File; import java.io.FileNotFoundException; import java.io.FileReader; @@ -48,6 +32,23 @@ import java.util.Map; import java.util.Properties; +import javax.xml.transform.OutputKeys; +import javax.xml.transform.Transformer; +import javax.xml.transform.TransformerConfigurationException; +import javax.xml.transform.TransformerException; +import javax.xml.transform.TransformerFactory; +import javax.xml.transform.dom.DOMSource; +import javax.xml.transform.stream.StreamResult; + +import org.apache.commons.io.IOUtils; +import org.apache.jackrabbit.core.fs.FileSystem; +import org.apache.jackrabbit.core.fs.FileSystemException; +import org.apache.jackrabbit.core.fs.FileSystemPathUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.w3c.dom.Element; +import org.xml.sax.InputSource; + /** * Repository configuration. This configuration class is used to * create configured repository objects. @@ -223,6 +224,12 @@ */ private final DataStoreConfig dataStoreConfig; + + /** + * Optional connection provider class. + */ + private final ConnectionProviderConfig connectionProviderConfig; + /** * Creates a repository configuration object. * @@ -244,7 +251,8 @@ String workspaceDirectory, String workspaceConfigDirectory, String defaultWorkspace, int workspaceMaxIdleTime, Element template, VersioningConfig vc, SearchConfig sc, - ClusterConfig cc, DataStoreConfig dataStoreConfig, RepositoryConfigurationParser parser) { + ClusterConfig cc, DataStoreConfig dataStoreConfig, + ConnectionProviderConfig connectionProviderConfig, RepositoryConfigurationParser parser) { workspaces = new HashMap(); this.home = home; this.sec = sec; @@ -258,6 +266,7 @@ this.sc = sc; this.cc = cc; this.dataStoreConfig = dataStoreConfig; + this.connectionProviderConfig = connectionProviderConfig; this.parser = parser; } @@ -753,6 +762,16 @@ public DataStoreConfig getDataStoreConfig() { return dataStoreConfig; } + + /** + * Returns configured {@link ConnectionProviderConfig} + * + * @return {@link ConnectionProviderConfig} instance or null + * if no provider has been configured. + */ + public ConnectionProviderConfig getConnectionProviderConfig() { + return connectionProviderConfig; + } } Index: src/main/java/org/apache/jackrabbit/core/config/RepositoryConfigurationParser.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/config/RepositoryConfigurationParser.java (revision 676201) +++ src/main/java/org/apache/jackrabbit/core/config/RepositoryConfigurationParser.java (working copy) @@ -16,6 +16,9 @@ */ package org.apache.jackrabbit.core.config; +import java.io.File; +import java.util.Properties; + import org.w3c.dom.Element; import org.w3c.dom.Node; import org.w3c.dom.NodeList; @@ -21,9 +24,6 @@ import org.w3c.dom.NodeList; import org.xml.sax.InputSource; -import java.io.File; -import java.util.Properties; - /** * Configuration parser. This class is used to parse the repository and * workspace configuration files. @@ -145,6 +145,8 @@ */ private static final String AC_PROVIDER_ELEMENT = "AccessControlProvider"; + private static final String CONNECTION_PROVIDER_ELEMENT = "ConnectionProvider"; + /** * Creates a new configuration parser with the given parser variables. * @@ -247,10 +249,16 @@ // Optional data store configuration DataStoreConfig dsc = parseDataStoreConfig(root); - + + Element connectionProviderElement = getElement(root, CONNECTION_PROVIDER_ELEMENT, false); + ConnectionProviderConfig cpc = null; + if (connectionProviderElement != null) { + cpc = new ConnectionProviderConfig(parseBeanConfig(root, CONNECTION_PROVIDER_ELEMENT)); + } + return new RepositoryConfig(home, securityConfig, fsc, workspaceDirectory, workspaceConfigDirectory, defaultWorkspace, - maxIdleTime, template, vc, sc, cc, dsc, this); + maxIdleTime, template, vc, sc, cc, dsc, cpc, this); } /** Index: src/main/java/org/apache/jackrabbit/core/config/ConnectionProviderConfig.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/config/ConnectionProviderConfig.java (revision 0) +++ src/main/java/org/apache/jackrabbit/core/config/ConnectionProviderConfig.java (revision 0) @@ -0,0 +1,29 @@ +package org.apache.jackrabbit.core.config; + +import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionProvider; + + +public class ConnectionProviderConfig extends BeanConfig { + + public ConnectionProviderConfig(BeanConfig config) { + super(config); + } + + /** + * Instantiates and initializes the configured connection provider + * implementation class. + * + * @return new initialized connection provider instance. + * @throws ConfigurationException on file system initialization errors + */ + public ConnectionProvider createConnectionProvider() throws ConfigurationException { + try { + ConnectionProvider cp = (ConnectionProvider) newInstance(); + return cp; + } catch (ClassCastException e) { + throw new ConfigurationException( + "Invalid file system implementation class " + + getClassName() + ".", e); + } + } +} Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/BundleDbPersistenceManager.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/persistence/bundle/BundleDbPersistenceManager.java (revision 676201) +++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/BundleDbPersistenceManager.java (working copy) @@ -50,6 +50,7 @@ import org.apache.jackrabbit.core.fs.local.LocalFileSystem; import org.apache.jackrabbit.core.persistence.PMContext; import org.apache.jackrabbit.core.persistence.bundle.util.BundleBinding; +import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionManager; import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionRecoveryManager; import org.apache.jackrabbit.core.persistence.bundle.util.DbNameIndex; import org.apache.jackrabbit.core.persistence.bundle.util.ErrorHandling; @@ -55,6 +56,7 @@ import org.apache.jackrabbit.core.persistence.bundle.util.ErrorHandling; import org.apache.jackrabbit.core.persistence.bundle.util.NodePropBundle; import org.apache.jackrabbit.core.persistence.bundle.util.StringIndex; +import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionProvider.ConnectionProperties; import org.apache.jackrabbit.core.persistence.util.BLOBStore; import org.apache.jackrabbit.core.persistence.util.FileSystemBLOBStore; import org.apache.jackrabbit.core.persistence.util.Serializer; @@ -142,7 +144,11 @@ /** * The class that manages statement execution and recovery from connection loss. */ - protected ConnectionRecoveryManager connectionManager; + protected ConnectionRecoveryManager connectionManager__; + + protected ConnectionManager connectionManager; + + //private ConnectionManager // SQL statements for bundle management protected String bundleInsertSQL; @@ -425,9 +431,11 @@ throw new RepositoryException(msg); } BufferedReader reader = new BufferedReader(new InputStreamReader(in)); - Statement stmt = connectionManager.getConnection().createStatement(); + Connection connection = connectionManager.getConnection(); + Statement stmt = null; String sql = null; try { + stmt = connection.createStatement(); sql = reader.readLine(); while (sql != null) { if (!sql.startsWith("#") && sql.length() > 0 @@ -451,7 +459,9 @@ throw se; } finally { IOUtils.closeQuietly(in); - stmt.close(); + connectionManager.close(connection, stmt); + connectionManager.close(connection); + } } } @@ -477,19 +487,24 @@ * @throws RepositoryException if a repository exception occurs. */ protected boolean checkTablesExist() throws SQLException, RepositoryException { - DatabaseMetaData metaData = connectionManager.getConnection().getMetaData(); - String tableName = schemaObjectPrefix + "BUNDLE"; - if (metaData.storesLowerCaseIdentifiers()) { - tableName = tableName.toLowerCase(); - } else if (metaData.storesUpperCaseIdentifiers()) { - tableName = tableName.toUpperCase(); - } - String userName = checkTablesWithUser() ? metaData.getUserName() : null; - ResultSet rs = metaData.getTables(null, userName, tableName, null); + Connection connection = connectionManager.getConnection(); try { - return rs.next(); + DatabaseMetaData metaData = connection.getMetaData(); + String tableName = schemaObjectPrefix + "BUNDLE"; + if (metaData.storesLowerCaseIdentifiers()) { + tableName = tableName.toLowerCase(); + } else if (metaData.storesUpperCaseIdentifiers()) { + tableName = tableName.toUpperCase(); + } + String userName = checkTablesWithUser() ? metaData.getUserName() : null; + ResultSet rs = metaData.getTables(null, userName, tableName, null); + try { + return rs.next(); + } finally { + rs.close(); + } } finally { - rs.close(); + connectionManager.close(connection); } } @@ -514,33 +529,34 @@ public synchronized void store(ChangeLog changeLog) throws ItemStateException { int trials = 2; Throwable lastException = null; - do { - trials--; - Connection con = null; - try { - con = connectionManager.getConnection(); - connectionManager.setAutoReconnect(false); - con.setAutoCommit(false); - super.store(changeLog); - con.commit(); - con.setAutoCommit(true); - return; - } catch (Throwable th) { - lastException = th; + Connection con = null; + try { + do { + trials--; try { - if (con != null) { - con.rollback(); + con = connectionManager.getConnection(); + con.setAutoCommit(false); + super.store(changeLog); + con.commit(); + con.setAutoCommit(true); + return; + } catch (Throwable th) { + lastException = th; + try { + if (con != null) { + con.rollback(); + } + } catch (SQLException e) { + logException("rollback failed", e); } - } catch (SQLException e) { - logException("rollback failed", e); - } - if (th instanceof SQLException || th.getCause() instanceof SQLException) { - connectionManager.close(); - } - } finally { - connectionManager.setAutoReconnect(true); - } - } while(blockOnConnectionLoss || trials > 0); + if (th instanceof SQLException || th.getCause() instanceof SQLException) { + connectionManager.close(con); + } + } + } while(blockOnConnectionLoss || trials > 0); + } finally { + connectionManager.close(con); + } throw new ItemStateException(lastException.getMessage()); } @@ -555,9 +571,17 @@ this.name = context.getHomeDir().getName(); - connectionManager = new ConnectionRecoveryManager(blockOnConnectionLoss, - getDriver(), getUrl(), getUser(), getPassword()); +// connectionManager__ = new ConnectionRecoveryManager(blockOnConnectionLoss, +// getDriver(), getUrl(), getUser(), getPassword()); + ConnectionProperties properties = new ConnectionProperties(); + properties.setUrl(getUrl()); + properties.setUser(getUser()); + properties.setPassword(getPassword()); + properties.setDriver(getDriver()); + + connectionManager = new ConnectionManager(context.getConnectionProvider(), properties); + // make sure schemaObjectPrefix consists of legal name characters only prepareSchemaObjectPrefix(); @@ -763,9 +787,11 @@ // get all node bundles in the database with a single sql statement, // which is (probably) faster than loading each bundle and traversing the tree ResultSet rs = null; + Connection connection = null; try { + connection = connectionManager.getConnection(); String sql = "select count(*) from " + schemaObjectPrefix + "BUNDLE"; - Statement stmt = connectionManager.executeStmt(sql, new Object[0]); + Statement stmt = connectionManager.executeStmt(connection, sql, new Object[0]); try { rs = stmt.getResultSet(); if (!rs.next()) { @@ -781,7 +807,7 @@ } else { sql = "select NODE_ID_HI, NODE_ID_LO from " + schemaObjectPrefix + "BUNDLE"; } - stmt = connectionManager.executeStmt(sql, new Object[0]); + stmt = connectionManager.executeStmt(connection, sql, new Object[0]); rs = stmt.getResultSet(); // iterate over all node bundles in the db @@ -797,7 +823,7 @@ ResultSet bRs = null; byte[] data = null; try { - Statement bSmt = connectionManager.executeStmt(bundleSelectSQL, getKey(id.getUUID())); + Statement bSmt = connectionManager.executeStmt(connection, bundleSelectSQL, getKey(id.getUUID())); bRs = bSmt.getResultSet(); if (!bRs.next()) { throw new SQLException("bundle cannot be retrieved?"); @@ -808,7 +834,6 @@ closeResultSet(bRs); } - try { // parse and check bundle // checkBundle will log any problems itself @@ -833,6 +858,7 @@ log.error("Error loading bundle", e); } finally { closeResultSet(rs); + connectionManager.close(connection); total = count; } } else { @@ -917,25 +943,31 @@ * @throws Exception if an error occurs */ protected void prepareSchemaObjectPrefix() throws Exception { - DatabaseMetaData metaData = connectionManager.getConnection().getMetaData(); - String legalChars = metaData.getExtraNameCharacters(); - legalChars += "ABCDEFGHIJKLMNOPQRSTUVWXZY0123456789_"; - - String prefix = schemaObjectPrefix.toUpperCase(); - StringBuffer escaped = new StringBuffer(); - for (int i = 0; i < prefix.length(); i++) { - char c = prefix.charAt(i); - if (legalChars.indexOf(c) == -1) { - escaped.append("_x"); - String hex = Integer.toHexString(c); - escaped.append("0000".toCharArray(), 0, 4 - hex.length()); - escaped.append(hex); - escaped.append("_"); - } else { - escaped.append(c); + Connection connection = null; + try { + connection = connectionManager.getConnection(); + DatabaseMetaData metaData = connection.getMetaData(); + String legalChars = metaData.getExtraNameCharacters(); + legalChars += "ABCDEFGHIJKLMNOPQRSTUVWXZY0123456789_"; + + String prefix = schemaObjectPrefix.toUpperCase(); + StringBuffer escaped = new StringBuffer(); + for (int i = 0; i < prefix.length(); i++) { + char c = prefix.charAt(i); + if (legalChars.indexOf(c) == -1) { + escaped.append("_x"); + String hex = Integer.toHexString(c); + escaped.append("0000".toCharArray(), 0, 4 - hex.length()); + escaped.append(hex); + escaped.append("_"); + } else { + escaped.append(c); + } } + schemaObjectPrefix = escaped.toString(); + } finally { + connectionManager.close(connection); } - schemaObjectPrefix = escaped.toString(); } /** @@ -950,7 +982,6 @@ if (nameIndex instanceof DbNameIndex) { ((DbNameIndex) nameIndex).close(); } - connectionManager.close(); // close blob store blobStore.close(); blobStore = null; @@ -1035,6 +1066,7 @@ public synchronized NodeIdIterator getAllNodeIds(NodeId bigger, int maxCount) throws ItemStateException, RepositoryException { ResultSet rs = null; + Connection connection = null; try { UUID lowUuid; Object[] keys; @@ -1055,7 +1087,8 @@ // see also bundleSelectAllIdsFrom SQL statement maxCount += 10; } - Statement stmt = connectionManager.executeStmt(sql, keys, false, maxCount); + connection = connectionManager.getConnection(); + Statement stmt = connectionManager.executeStmt(connection, sql, keys, false, maxCount); rs = stmt.getResultSet(); ArrayList result = new ArrayList(); while ((maxCount == 0 || result.size() < maxCount) && rs.next()) { @@ -1082,6 +1115,7 @@ throw new ItemStateException(msg, e); } finally { closeResultSet(rs); + connectionManager.close(connection); } } @@ -1131,8 +1165,10 @@ protected synchronized NodePropBundle loadBundle(NodeId id, boolean checkBeforeLoading) throws ItemStateException { ResultSet rs = null; + Connection connection = null; try { - Statement stmt = connectionManager.executeStmt(bundleSelectSQL, getKey(id.getUUID())); + connection = connectionManager.getConnection(); + Statement stmt = connectionManager.executeStmt(connection, bundleSelectSQL, getKey(id.getUUID())); rs = stmt.getResultSet(); if (!rs.next()) { return null; @@ -1160,6 +1196,7 @@ throw new ItemStateException(msg, e); } finally { closeResultSet(rs); + connectionManager.close(connection); } } @@ -1168,8 +1205,10 @@ */ protected synchronized boolean existsBundle(NodeId id) throws ItemStateException { ResultSet rs = null; + Connection connection = null; try { - Statement stmt = connectionManager.executeStmt(bundleSelectSQL, getKey(id.getUUID())); + connection = connectionManager.getConnection(); + Statement stmt = connectionManager.executeStmt(connection, bundleSelectSQL, getKey(id.getUUID())); rs = stmt.getResultSet(); // a bundle exists, if the result has at least one entry return rs.next(); @@ -1179,6 +1218,7 @@ throw new ItemStateException(msg, e); } finally { closeResultSet(rs); + connectionManager.close(connection); } } @@ -1186,6 +1226,7 @@ * {@inheritDoc} */ protected synchronized void storeBundle(NodePropBundle bundle) throws ItemStateException { + Connection connection = null; try { ByteArrayOutputStream out = new ByteArrayOutputStream(INITIAL_BUFFER_SIZE); DataOutputStream dout = new DataOutputStream(out); @@ -1194,7 +1235,8 @@ String sql = bundle.isNew() ? bundleInsertSQL : bundleUpdateSQL; Object[] params = createParams(bundle.getId().getUUID(), out.toByteArray(), true); - connectionManager.executeStmt(sql, params); + connection = connectionManager.getConnection(); + connectionManager.executeStmt(connection, sql, params); } catch (Exception e) { String msg = "failed to write bundle: " + bundle.getId(); log.error(msg, e); @@ -1199,6 +1241,8 @@ String msg = "failed to write bundle: " + bundle.getId(); log.error(msg, e); throw new ItemStateException(msg, e); + } finally { + connectionManager.close(connection); } } @@ -1206,8 +1250,10 @@ * {@inheritDoc} */ protected synchronized void destroyBundle(NodePropBundle bundle) throws ItemStateException { + Connection connection = null; try { - connectionManager.executeStmt(bundleDeleteSQL, getKey(bundle.getId().getUUID())); + connection = connectionManager.getConnection(); + connectionManager.executeStmt(connection, bundleDeleteSQL, getKey(bundle.getId().getUUID())); } catch (Exception e) { if (e instanceof NoSuchItemStateException) { throw (NoSuchItemStateException) e; @@ -1215,6 +1261,8 @@ String msg = "failed to delete bundle: " + bundle.getId(); log.error(msg, e); throw new ItemStateException(msg, e); + } finally { + connectionManager.close(connection); } } @@ -1229,8 +1277,10 @@ ResultSet rs = null; InputStream in = null; + Connection connection = null; try { - Statement stmt = connectionManager.executeStmt( + connection = connectionManager.getConnection(); + Statement stmt = connectionManager.executeStmt(connection, nodeReferenceSelectSQL, getKey(targetId.getTargetId().getUUID())); rs = stmt.getResultSet(); if (!rs.next()) { @@ -1252,6 +1302,7 @@ } finally { IOUtils.closeQuietly(in); closeResultSet(rs); + connectionManager.close(connection); } } @@ -1272,7 +1323,7 @@ // check if insert or update boolean update = exists(refs.getId()); String sql = (update) ? nodeReferenceUpdateSQL : nodeReferenceInsertSQL; - + Connection connection = null; try { ByteArrayOutputStream out = new ByteArrayOutputStream(INITIAL_BUFFER_SIZE); @@ -1280,7 +1331,8 @@ Serializer.serialize(refs, out); Object[] params = createParams(refs.getTargetId().getUUID(), out.toByteArray(), true); - connectionManager.executeStmt(sql, params); + connection = connectionManager.getConnection(); + connectionManager.executeStmt(connection, sql, params); // there's no need to close a ByteArrayOutputStream //out.close(); @@ -1288,6 +1340,8 @@ String msg = "failed to write node references: " + refs.getId(); log.error(msg, e); throw new ItemStateException(msg, e); + } finally { + connectionManager.close(connection); } } @@ -1298,9 +1352,10 @@ if (!initialized) { throw new IllegalStateException("not initialized"); } - + Connection connection = null; try { - connectionManager.executeStmt(nodeReferenceDeleteSQL, + connection = connectionManager.getConnection(); + connectionManager.executeStmt(connection, nodeReferenceDeleteSQL, getKey(refs.getTargetId().getUUID())); } catch (Exception e) { if (e instanceof NoSuchItemStateException) { @@ -1309,6 +1364,8 @@ String msg = "failed to delete references: " + refs.getTargetId(); log.error(msg, e); throw new ItemStateException(msg, e); + } finally { + connectionManager.close(connection); } } @@ -1321,8 +1378,10 @@ } ResultSet rs = null; + Connection connection = null; try { - Statement stmt = connectionManager.executeStmt(nodeReferenceSelectSQL, + connection = connectionManager.getConnection(); + Statement stmt = connectionManager.executeStmt(connection, nodeReferenceSelectSQL, getKey(targetId.getTargetId().getUUID())); rs = stmt.getResultSet(); @@ -1335,6 +1394,7 @@ throw new ItemStateException(msg, e); } finally { closeResultSet(rs); + connectionManager.close(connection); } } @@ -1521,31 +1581,37 @@ * {@inheritDoc} */ public InputStream get(String blobId) throws Exception { - Statement stmt = connectionManager.executeStmt(blobSelectSQL, new Object[]{blobId}); - final ResultSet rs = stmt.getResultSet(); - if (!rs.next()) { - closeResultSet(rs); - throw new Exception("no such BLOB: " + blobId); - } - InputStream in = rs.getBinaryStream(1); - if (in == null) { - // some databases treat zero-length values as NULL; - // return empty InputStream in such a case - closeResultSet(rs); - return new ByteArrayInputStream(new byte[0]); - } - - /** - * return an InputStream wrapper in order to - * close the ResultSet when the stream is closed - */ - return new FilterInputStream(in) { - public void close() throws IOException { - in.close(); - // now it's safe to close ResultSet + Connection connection = null; + try { + connection = connectionManager.getConnection(); + Statement stmt = connectionManager.executeStmt(connection, blobSelectSQL, new Object[]{blobId}); + final ResultSet rs = stmt.getResultSet(); + if (!rs.next()) { + closeResultSet(rs); + throw new Exception("no such BLOB: " + blobId); + } + InputStream in = rs.getBinaryStream(1); + if (in == null) { + // some databases treat zero-length values as NULL; + // return empty InputStream in such a case closeResultSet(rs); + return new ByteArrayInputStream(new byte[0]); } - }; + + /** + * return an InputStream wrapper in order to + * close the ResultSet when the stream is closed + */ + return new FilterInputStream(in) { + public void close() throws IOException { + in.close(); + // now it's safe to close ResultSet + closeResultSet(rs); + } + }; + } finally { + connectionManager.close(connection); + } } /** @@ -1553,15 +1619,22 @@ */ public synchronized void put(String blobId, InputStream in, long size) throws Exception { - Statement stmt = connectionManager.executeStmt(blobSelectExistSQL, new Object[]{blobId}); - ResultSet rs = stmt.getResultSet(); - // a BLOB exists if the result has at least one entry - boolean exists = rs.next(); - closeResultSet(rs); + Connection connection = null; + try { + connection = connectionManager.getConnection(); + + Statement stmt = connectionManager.executeStmt(connection, blobSelectExistSQL, new Object[]{blobId}); + ResultSet rs = stmt.getResultSet(); + // a BLOB exists if the result has at least one entry + boolean exists = rs.next(); + closeResultSet(rs); - String sql = (exists) ? blobUpdateSQL : blobInsertSQL; - Object[] params = new Object[]{new ConnectionRecoveryManager.StreamWrapper(in, size), blobId}; - connectionManager.executeStmt(sql, params); + String sql = (exists) ? blobUpdateSQL : blobInsertSQL; + Object[] params = new Object[]{new ConnectionRecoveryManager.StreamWrapper(in, size), blobId}; + connectionManager.executeStmt(connection, sql, params); + } finally { + connectionManager.close(connection); + } } /** @@ -1568,8 +1641,14 @@ * {@inheritDoc} */ public synchronized boolean remove(String blobId) throws Exception { - Statement stmt = connectionManager.executeStmt(blobDeleteSQL, new Object[]{blobId}); - return stmt.getUpdateCount() == 1; + Connection connection = null; + try { + connection = connectionManager.getConnection(); + Statement stmt = connectionManager.executeStmt(connection, blobDeleteSQL, new Object[]{blobId}); + return stmt.getUpdateCount() == 1; + } finally { + connectionManager.close(connection); + } } public void close() { Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/DerbyPersistenceManager.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/persistence/bundle/DerbyPersistenceManager.java (revision 676201) +++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/DerbyPersistenceManager.java (working copy) @@ -20,6 +20,7 @@ import org.slf4j.LoggerFactory; import org.apache.jackrabbit.core.persistence.PMContext; +import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Statement; @@ -260,8 +261,11 @@ protected void checkSchema() throws SQLException, RepositoryException { // set properties if (DERBY_EMBEDDED_DRIVER.equals(getDriver())) { - Statement stmt = connectionManager.getConnection().createStatement(); + Connection connection = null; + Statement stmt = null; try { + connection = connectionManager.getConnection(); + stmt = connection.createStatement(); stmt.execute("CALL SYSCS_UTIL.SYSCS_SET_DATABASE_PROPERTY " + "('derby.storage.initialPages', '" + derbyStorageInitialPages + "')"); stmt.execute("CALL SYSCS_UTIL.SYSCS_SET_DATABASE_PROPERTY " @@ -273,8 +277,9 @@ stmt.execute("CALL SYSCS_UTIL.SYSCS_SET_DATABASE_PROPERTY " + "('derby.storage.pageSize', '" + derbyStoragePageSize + "')"); - } finally { - stmt.close(); + } finally { + connectionManager.close(connection, stmt); + connectionManager.close(connection); } } super.checkSchema(); @@ -304,31 +309,38 @@ if (!DERBY_EMBEDDED_DRIVER.equals(getDriver())) { return; } + + Connection connection = null; + try { + connection = connectionManager.getConnection(); + // prepare connection url for issuing shutdown command + String url = connection.getMetaData().getURL(); + int pos = url.lastIndexOf(';'); + if (pos != -1) { + // strip any attributes from connection url + url = url.substring(0, pos); + } + url += ";shutdown=true"; - // prepare connection url for issuing shutdown command - String url = connectionManager.getConnection().getMetaData().getURL(); - int pos = url.lastIndexOf(';'); - if (pos != -1) { - // strip any attributes from connection url - url = url.substring(0, pos); - } - url += ";shutdown=true"; + // we have to reset the connection to 'autoCommit=true' before closing it; + // otherwise Derby would mysteriously complain about some pending uncommitted + // changes which can't possibly be true. + // @todo further investigate + connection.setAutoCommit(true); - // we have to reset the connection to 'autoCommit=true' before closing it; - // otherwise Derby would mysteriously complain about some pending uncommitted - // changes which can't possibly be true. - // @todo further investigate - connectionManager.getConnection().setAutoCommit(true); + super.close(); - super.close(); + // now it's safe to shutdown the embedded Derby database + try { + DriverManager.getConnection(url); + } catch (SQLException e) { + // a shutdown command always raises a SQLException + log.info(e.getMessage()); + } - // now it's safe to shutdown the embedded Derby database - try { - DriverManager.getConnection(url); - } catch (SQLException e) { - // a shutdown command always raises a SQLException - log.info(e.getMessage()); - } + } finally { + connectionManager.close(connection); + } } } Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/H2PersistenceManager.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/persistence/bundle/H2PersistenceManager.java (revision 676201) +++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/H2PersistenceManager.java (working copy) @@ -18,6 +18,7 @@ import org.apache.jackrabbit.core.persistence.PMContext; +import java.sql.Connection; import java.sql.Statement; import java.sql.SQLException; @@ -95,11 +96,15 @@ * {@inheritDoc} */ protected void checkSchema() throws SQLException, RepositoryException { - Statement stmt = connectionManager.getConnection().createStatement(); + Connection connection = null; + Statement stmt = null; try { + connection = connectionManager.getConnection(); + stmt = connection.createStatement(); stmt.execute("SET LOCK_TIMEOUT " + lockTimeout); } finally { - stmt.close(); + connectionManager.close(connection, stmt); + connectionManager.close(connection); } super.checkSchema(); } Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/Oracle9PersistenceManager.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/persistence/bundle/Oracle9PersistenceManager.java (revision 676201) +++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/Oracle9PersistenceManager.java (working copy) @@ -88,11 +88,17 @@ // use the Connection object for using the exact same // class loader that the Oracle driver was loaded with - blobClass = connectionManager.getConnection().getClass().getClassLoader().loadClass("oracle.sql.BLOB"); - duractionSessionConstant = - new Integer(blobClass.getField("DURATION_SESSION").getInt(null)); - modeReadWriteConstant = - new Integer(blobClass.getField("MODE_READWRITE").getInt(null)); + Connection connection = null; + try { + connection = connectionManager.getConnection(); + blobClass = connection.getClass().getClassLoader().loadClass("oracle.sql.BLOB"); + duractionSessionConstant = + new Integer(blobClass.getField("DURATION_SESSION").getInt(null)); + modeReadWriteConstant = + new Integer(blobClass.getField("MODE_READWRITE").getInt(null)); + } finally { + connectionManager.close(connection); + } } /** @@ -108,6 +114,7 @@ protected synchronized void storeBundle(NodePropBundle bundle) throws ItemStateException { Blob blob = null; + Connection connection = null; try { ByteArrayOutputStream out = new ByteArrayOutputStream(INITIAL_BUFFER_SIZE); DataOutputStream dout = new DataOutputStream(out); @@ -117,7 +124,8 @@ String sql = bundle.isNew() ? bundleInsertSQL : bundleUpdateSQL; blob = createTemporaryBlob(new ByteArrayInputStream(out.toByteArray())); Object[] params = createParams(bundle.getId().getUUID(), blob, true); - connectionManager.executeStmt(sql, params); + connection = connectionManager.getConnection(); + connectionManager.executeStmt(connection, sql, params); } catch (Exception e) { String msg = "failed to write bundle: " + bundle.getId(); log.error(msg, e); @@ -129,6 +137,7 @@ } catch (Exception e1) { } } + connectionManager.close(connection); } } @@ -142,6 +151,7 @@ } Blob blob = null; + Connection connection = null; try { // check if insert or update boolean update = exists(refs.getId()); @@ -156,7 +166,8 @@ blob = createTemporaryBlob(new ByteArrayInputStream(out.toByteArray())); Object[] params = createParams(refs.getTargetId().getUUID(), blob, true); - connectionManager.executeStmt(sql, params); + connection = connectionManager.getConnection(); + connectionManager.executeStmt(connection, sql, params); // there's no need to close a ByteArrayOutputStream //out.close(); @@ -171,6 +182,7 @@ } catch (Exception e1) { } } + connectionManager.close(connection); } } @@ -190,26 +202,33 @@ blob.close(); return blob; */ - Method createTemporary = blobClass.getMethod("createTemporary", - new Class[]{Connection.class, Boolean.TYPE, Integer.TYPE}); - Object blob = createTemporary.invoke(null, - new Object[]{connectionManager.getConnection(), Boolean.FALSE, duractionSessionConstant}); - Method open = blobClass.getMethod("open", new Class[]{Integer.TYPE}); - open.invoke(blob, new Object[]{modeReadWriteConstant}); - Method getBinaryOutputStream = blobClass.getMethod("getBinaryOutputStream", new Class[0]); - OutputStream out = (OutputStream) getBinaryOutputStream.invoke(blob, null); - try { - IOUtils.copy(in, out); - } finally { + + Connection connection = null; + try { + connection = connectionManager.getConnection(); + Method createTemporary = blobClass.getMethod("createTemporary", + new Class[]{Connection.class, Boolean.TYPE, Integer.TYPE}); + Object blob = createTemporary.invoke(null, + new Object[]{connection, Boolean.FALSE, duractionSessionConstant}); + Method open = blobClass.getMethod("open", new Class[]{Integer.TYPE}); + open.invoke(blob, new Object[]{modeReadWriteConstant}); + Method getBinaryOutputStream = blobClass.getMethod("getBinaryOutputStream", new Class[0]); + OutputStream out = (OutputStream) getBinaryOutputStream.invoke(blob, null); try { - out.flush(); - } catch (IOException ioe) { + IOUtils.copy(in, out); + } finally { + try { + out.flush(); + } catch (IOException ioe) { + } + out.close(); } - out.close(); + Method close = blobClass.getMethod("close", new Class[0]); + close.invoke(blob, null); + return (Blob) blob; + } finally { + connectionManager.close(connection); } - Method close = blobClass.getMethod("close", new Class[0]); - close.invoke(blob, null); - return (Blob) blob; } /** @@ -234,8 +253,11 @@ throws Exception { Blob blob = null; + Connection connection = null; + Statement stmt = null; try { - Statement stmt = connectionManager.executeStmt(blobSelectExistSQL, new Object[]{blobId}); + connection = connectionManager.getConnection(); + stmt = connectionManager.executeStmt(connection, blobSelectExistSQL, new Object[]{blobId}); ResultSet rs = stmt.getResultSet(); // a BLOB exists if the result has at least one entry boolean exists = rs.next(); @@ -243,7 +265,7 @@ String sql = (exists) ? blobUpdateSQL : blobInsertSQL; blob = createTemporaryBlob(in); - connectionManager.executeStmt(sql, new Object[]{blob, blobId}); + connectionManager.executeStmt(connection, sql, new Object[]{blob, blobId}); } finally { if (blob != null) { try { @@ -251,6 +273,8 @@ } catch (Exception e) { } } + connectionManager.close(connection, stmt); + connectionManager.close(connection); } } } Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/OraclePersistenceManager.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/persistence/bundle/OraclePersistenceManager.java (revision 676201) +++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/OraclePersistenceManager.java (working copy) @@ -16,6 +16,7 @@ */ package org.apache.jackrabbit.core.persistence.bundle; +import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.SQLException; @@ -104,8 +105,10 @@ super.init(context); // check driver version + Connection connection = null; try { - DatabaseMetaData metaData = connectionManager.getConnection().getMetaData(); + connection = connectionManager.getConnection(); + DatabaseMetaData metaData = connection.getMetaData(); if (metaData.getDriverMajorVersion() < 10) { // Oracle drivers prior to version 10 only support // writing BLOBs up to 32k in size... @@ -115,6 +118,8 @@ } } catch (SQLException e) { log.warn("Can not retrieve driver version", e); + } finally { + connectionManager.close(connection); } } @@ -159,20 +164,26 @@ * @inheritDoc */ protected void prepareSchemaObjectPrefix() throws Exception { - DatabaseMetaData metaData = connectionManager.getConnection().getMetaData(); - String legalChars = metaData.getExtraNameCharacters(); - legalChars += "ABCDEFGHIJKLMNOPQRSTUVWXZY0123456789_"; + Connection connection = null; + try { + connection = connectionManager.getConnection(); + DatabaseMetaData metaData = connection.getMetaData(); + String legalChars = metaData.getExtraNameCharacters(); + legalChars += "ABCDEFGHIJKLMNOPQRSTUVWXZY0123456789_"; - String prefix = schemaObjectPrefix.toUpperCase(); - StringBuffer escaped = new StringBuffer(); - for (int i = 0; i < prefix.length(); i++) { - char c = prefix.charAt(i); - if (legalChars.indexOf(c) == -1) { - escaped.append('_'); - } else { - escaped.append(c); + String prefix = schemaObjectPrefix.toUpperCase(); + StringBuffer escaped = new StringBuffer(); + for (int i = 0; i < prefix.length(); i++) { + char c = prefix.charAt(i); + if (legalChars.indexOf(c) == -1) { + escaped.append('_'); + } else { + escaped.append(c); + } } + schemaObjectPrefix = escaped.toString(); + } finally { + connectionManager.close(connection); } - schemaObjectPrefix = escaped.toString(); } } Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/PostgreSQLPersistenceManager.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/persistence/bundle/PostgreSQLPersistenceManager.java (revision 676201) +++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/PostgreSQLPersistenceManager.java (working copy) @@ -28,6 +28,7 @@ import java.io.DataInputStream; import java.io.InputStream; +import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; @@ -90,8 +91,11 @@ protected synchronized NodePropBundle loadBundle(NodeId id) throws ItemStateException { + Connection connection = null; + Statement stmt = null; try { - Statement stmt = connectionManager.executeStmt(bundleSelectSQL, getKey(id.getUUID())); + connection = connectionManager.getConnection(); + stmt = connectionManager.executeStmt(connection, bundleSelectSQL, getKey(id.getUUID())); ResultSet rs = stmt.getResultSet(); try { if (rs.next()) { @@ -115,6 +119,9 @@ String msg = "failed to read bundle: " + id + ": " + e; log.error(msg); throw new ItemStateException(msg, e); + } finally { + connectionManager.close(connection, stmt); + connectionManager.close(connection); } } Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/ConnectionRecoveryManager.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/ConnectionRecoveryManager.java (revision 676201) +++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/ConnectionRecoveryManager.java (working copy) @@ -462,8 +462,8 @@ */ public static class StreamWrapper { - private final InputStream stream; - private final long size; + final InputStream stream; + final long size; /** * Creates a wrapper for the given InputStream that can Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/DbNameIndex.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/DbNameIndex.java (revision 676201) +++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/DbNameIndex.java (working copy) @@ -16,11 +16,11 @@ */ package org.apache.jackrabbit.core.persistence.bundle.util; -import java.util.HashMap; - +import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.util.HashMap; /** * Implements a {@link StringIndex} that stores and retrieves the names from a @@ -38,7 +38,7 @@ /** * The class that manages statement execution and recovery from connection loss. */ - protected ConnectionRecoveryManager connectionManager; + protected ConnectionManager connectionManager; // name index statements protected String nameSelectSQL; @@ -55,7 +55,7 @@ * @param schemaObjectPrefix the prefix for table names * @throws SQLException if the statements cannot be prepared. */ - public DbNameIndex(ConnectionRecoveryManager conMgr, String schemaObjectPrefix) + public DbNameIndex(ConnectionManager conMgr, String schemaObjectPrefix) throws SQLException { connectionManager = conMgr; init(schemaObjectPrefix); @@ -134,8 +134,10 @@ protected int insertString(String string) { // assert index does not exist ResultSet rs = null; + Connection connection = null; try { - Statement stmt = connectionManager.executeStmt(nameInsertSQL, new Object[]{string}, true, 0); + connection = connectionManager.getConnection(); + Statement stmt = connectionManager.executeStmt(connection, nameInsertSQL, new Object[]{string}, true, 0); rs = stmt.getGeneratedKeys(); if (!rs.next()) { return -1; @@ -147,6 +149,7 @@ ise.initCause(e); throw ise; } finally { + connectionManager.close(connection); closeResultSet(rs); } } @@ -158,8 +161,10 @@ */ protected int getIndex(String string) { ResultSet rs = null; + Connection connection = null; try { - Statement stmt = connectionManager.executeStmt(indexSelectSQL, new Object[]{string}); + connection = connectionManager.getConnection(); + Statement stmt = connectionManager.executeStmt(connection, indexSelectSQL, new Object[]{string}); rs = stmt.getResultSet(); if (!rs.next()) { return -1; @@ -171,6 +176,7 @@ ise.initCause(e); throw ise; } finally { + connectionManager.close(connection); closeResultSet(rs); } } @@ -182,8 +188,10 @@ */ protected String getString(int index) { ResultSet rs = null; + Connection connection = null; try { - Statement stmt = connectionManager.executeStmt(nameSelectSQL, new Object[]{new Integer(index)}); + connection = connectionManager.getConnection(); + Statement stmt = connectionManager.executeStmt(connection, nameSelectSQL, new Object[]{new Integer(index)}); rs = stmt.getResultSet(); if (!rs.next()) { return null; @@ -195,6 +203,7 @@ ise.initCause(e); throw ise; } finally { + connectionManager.close(connection); closeResultSet(rs); } } Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/NGKDbNameIndex.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/NGKDbNameIndex.java (revision 676201) +++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/NGKDbNameIndex.java (working copy) @@ -16,8 +16,10 @@ */ package org.apache.jackrabbit.core.persistence.bundle.util; +import java.sql.Connection; +import java.sql.ResultSet; import java.sql.SQLException; -import java.sql.ResultSet; +import java.sql.Statement; /** * Same as {@link DbNameIndex} but does not make use of the @@ -32,7 +34,7 @@ * @param schemaObjectPrefix the prefix for table names * @throws SQLException if the statements cannot be prepared. */ - public NGKDbNameIndex(ConnectionRecoveryManager conMgr, String schemaObjectPrefix) + public NGKDbNameIndex(ConnectionManager conMgr, String schemaObjectPrefix) throws SQLException { super(conMgr, schemaObjectPrefix); } @@ -59,8 +61,10 @@ protected int insertString(String string) { // assert index does not exist ResultSet rs = null; + Connection connection = null; try { - connectionManager.executeStmt(nameInsertSQL, new Object[]{string}); + connection = connectionManager.getConnection(); + connectionManager.executeStmt(connection, nameInsertSQL, new Object[]{string}); return getIndex(string); } catch (Exception e) { IllegalStateException ise = new IllegalStateException("Unable to insert index for string: " + string); @@ -67,6 +71,7 @@ ise.initCause(e); throw ise; } finally { + connectionManager.close(connection); closeResultSet(rs); } } Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/PostgreSQLNameIndex.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/PostgreSQLNameIndex.java (revision 676201) +++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/PostgreSQLNameIndex.java (working copy) @@ -16,6 +16,8 @@ */ package org.apache.jackrabbit.core.persistence.bundle.util; +import java.sql.Connection; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; @@ -28,7 +30,7 @@ protected String generatedKeySelectSQL; - public PostgreSQLNameIndex(ConnectionRecoveryManager conMgr, String schemaObjectPrefix) + public PostgreSQLNameIndex(ConnectionManager conMgr, String schemaObjectPrefix) throws SQLException { super(conMgr, schemaObjectPrefix); } @@ -59,9 +61,11 @@ */ protected int insertString(String string) { // assert index does not exist - try { - connectionManager.executeStmt(nameInsertSQL, new Object[]{string}); - return getGeneratedKey(); + Connection connection = null; + try { + connection = connectionManager.getConnection(); + connectionManager.executeStmt(connection, nameInsertSQL, new Object[]{string}); + return getGeneratedKey(connection); } catch (Exception e) { IllegalStateException ise = new IllegalStateException("Unable to insert index for string: " + string); ise.initCause(e); @@ -66,6 +70,8 @@ IllegalStateException ise = new IllegalStateException("Unable to insert index for string: " + string); ise.initCause(e); throw ise ; + } finally { + connectionManager.close(connection); } } @@ -73,9 +79,9 @@ * Retrieves the last assigned key from the database. * @return the index. */ - protected int getGeneratedKey() { + protected int getGeneratedKey(Connection connection) { try { - ResultSet rs = connectionManager.executeQuery(generatedKeySelectSQL); + ResultSet rs = connectionManager.executeQuery(connection, generatedKeySelectSQL); try { if (!rs.next()) { return -1; Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/ConnectionManager.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/ConnectionManager.java (revision 0) +++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/ConnectionManager.java (revision 0) @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.core.persistence.bundle.util; + +import java.io.InputStream; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.HashMap; +import java.util.Map; + +import javax.jcr.RepositoryException; + +import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionProvider.ConnectionProperties; +import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionRecoveryManager.StreamWrapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap; + +/** + * This class provides methods to get a database connection and to execute SQL statements. + */ +public class ConnectionManager { + + private final ConnectionProvider connectionProvider; + private final ConnectionProperties connectionProperties; + + /** + * Creates a new {@link ConnectionManager} instance + * + * @param connectionProvider + * @param connectionProperties + */ + public ConnectionManager(ConnectionProvider connectionProvider, ConnectionProperties connectionProperties) { + this.connectionProvider = connectionProvider; + this.connectionProperties = connectionProperties; + } + + /** + * Returns a database {@link Connection} + * @return + * @throws SQLException + * @throws RepositoryException + */ + public Connection getConnection() throws SQLException, RepositoryException { + Connection connection = connectionProvider.getConnection(connectionProperties); + if (!connection.getAutoCommit()) + { + connection.setAutoCommit(true); + } + Map/**/ statements = new HashMap(); + + preparedStatements.put(connection, statements); + return connection; + }; + + /** + * Closes the database {@link Connection}. This method must always be called to close a connection + * (instead of calling {@link Connection#close()}). + * @param connection + */ + public void close(Connection connection) { + if (connection != null) { + preparedStatements.remove(connection); + try { + connectionProvider.closeConnection(connection); + } catch (SQLException e) { + log.error("Error closing SQL connection", e); + } + } + } + + public void close(Connection connection, Statement statement) { + if (statement != null) { + try { + if (connection != null) { + Map map = (Map) preparedStatements.get(connection); + if (map != null) { + map.remove(statement); + } + } + statement.close(); + } catch (SQLException e) { + log.error("Error closing SQL statement", e); + } + } + } + + private Map/*>*/ preparedStatements = new ConcurrentHashMap(); + + /** + * Executes the given SQL query. Retries once or blocks (when the + * block parameter has been set to true on construction) + * if this fails and autoReconnect is enabled. + * + * @param sql the SQL query to execute + * @return the executed ResultSet + * @throws SQLException on error + * @throws RepositoryException if the database driver could not be loaded + */ + public synchronized ResultSet executeQuery(Connection connection, String sql) throws SQLException, RepositoryException { + return executeQueryInternal(connection, sql); + } + + /** + * Executes the given SQL query. + * + * @param sql query to execute + * @return a ResultSet object + * @throws SQLException if an error occurs + * @throws RepositoryException if the database driver could not be loaded + */ + private ResultSet executeQueryInternal(Connection connection, String sql) throws SQLException, RepositoryException { + PreparedStatement stmt = null; + try { + Map statements = (Map) preparedStatements.get(connection); + stmt = (PreparedStatement) statements.get(sql); + if (stmt == null) { + stmt = connection.prepareStatement(sql); + statements.put(sql, stmt); + } + return stmt.executeQuery(); + } catch (SQLException e) { + logException("could not execute statement", e); + throw e; + } finally { + resetStatement(stmt); + } + } + + /** + * Resets the given PreparedStatement by clearing the + * parameters and warnings contained. + * + * @param stmt The PreparedStatement to reset. If + * null this method does nothing. + */ + private void resetStatement(PreparedStatement stmt) { + if (stmt != null) { + try { + stmt.clearParameters(); + stmt.clearWarnings(); + } catch (SQLException se) { + logException("Failed resetting PreparedStatement", se); + } + } + } + + /** + * Executes the given SQL statement with the specified parameters. + * + * @param sql statement to execute + * @param params parameters to set + * @return the Statement object that had been executed + * @throws SQLException if an error occurs + * @throws RepositoryException if the database driver could not be loaded + */ + public PreparedStatement executeStmt(Connection connection, String sql, Object[] params) throws SQLException, RepositoryException { + return executeStmt(connection, sql, params, false, 0); + } + + /** + * Executes the given SQL statement with the specified parameters. + * + * @param sql statement to execute + * @param params parameters to set + * @param returnGeneratedKeys if the statement should return auto generated keys + * @param maxRows the maximum number of rows to return (0 for all rows) + * @return the Statement object that had been executed + * @throws SQLException if an error occurs + * @throws RepositoryException if the database driver could not be loaded + */ + public synchronized PreparedStatement executeStmt(Connection connection, + String sql, Object[] params, boolean returnGeneratedKeys, int maxRows) + throws SQLException, RepositoryException { + return executeStmtInternal(connection, sql, params, returnGeneratedKeys, maxRows); + } + + /** + * Executes the given SQL statement with the specified parameters. + * + * @param sql statement to execute + * @param params parameters to set + * @param returnGeneratedKeys if the statement should return auto generated keys + * @param maxRows the maximum number of rows to return (0 for all rows) + * @return the Statement object that had been executed + * @throws SQLException if an error occurs + * @throws RepositoryException if the database driver could not be loaded + */ + private PreparedStatement executeStmtInternal(Connection connection, + String sql, Object[] params, boolean returnGeneratedKeys, int maxRows) + throws SQLException, RepositoryException { + try { + String key = sql; + if (returnGeneratedKeys) { + key += " RETURN_GENERATED_KEYS"; + } + Map statements = (Map) preparedStatements.get(connection); + PreparedStatement stmt = (PreparedStatement) statements.get(key); + if (stmt == null) { + if (returnGeneratedKeys) { + stmt = connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS); + } else { + stmt = connection.prepareStatement(sql); + } + statements.put(key, stmt); + } + stmt.setMaxRows(maxRows); + return executeStmtInternal(connection, params, stmt); + } catch (SQLException e) { + logException("could not execute statement", e); + throw e; + } + } + + /** + * @param params the parameters for the stmt parameter + * @param stmt the statement to execute + * @return the executed Statement + * @throws SQLException on error + */ + private PreparedStatement executeStmtInternal(Connection connection, Object[] params, PreparedStatement stmt) throws SQLException { + for (int i = 0; params != null && i < params.length; i++) { + Object p = params[i]; + if (p instanceof StreamWrapper) { + StreamWrapper wrapper = (StreamWrapper) p; + stmt.setBinaryStream(i + 1, wrapper.stream, (int) wrapper.size); + } else if (p instanceof InputStream) { + InputStream stream = (InputStream) p; + stmt.setBinaryStream(i + 1, stream, -1); + } else { + stmt.setObject(i + 1, p); + } + } + stmt.execute(); + resetStatement(stmt); + return stmt; + } + + /** + * A wrapper for a binary stream that includes the + * size of the stream. + * + */ +// public static class StreamWrapper { +// +// private final InputStream stream; +// private final long size; +// +// /** +// * Creates a wrapper for the given InputStream that can +// * safely be passed as a parameter to the executeStmt +// * methods in the {@link ConnectionRecoveryManager} class. +// * +// * @param in the InputStream to wrap +// * @param size the size of the input stream +// */ +// public StreamWrapper(InputStream in, long size) { +// this.stream = in; +// this.size = size; +// } +// } + + /** + * Logs an sql exception. + * + * @param message the message + * @param se the exception + */ + private void logException(String message, SQLException se) { + message = message == null ? "" : message; + log.error(message + ", reason: " + se.getMessage() + ", state/code: " + + se.getSQLState() + "/" + se.getErrorCode()); + log.debug(" dump:", se); + } + + private static final Logger log = LoggerFactory.getLogger(ConnectionManager.class); +} Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/ConnectionProvider.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/ConnectionProvider.java (revision 0) +++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/ConnectionProvider.java (revision 0) @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.core.persistence.bundle.util; + +import java.sql.Connection; +import java.sql.SQLException; + +import javax.jcr.RepositoryException; + + +/** + * Class responsible for providing SQL {@link Connection}s. + *

+ * Implementation of this class can do connection pooling, in which case it must + * take {@link ConnectionProperties} into account and only return connection + * that matches the specified properties. + */ +public interface ConnectionProvider { + + /** + * Returns connection with given properties. + * + * @param properties + * connection properties + * @return SQL {@link Connection} + * + * @throws RepositoryException + * @throws SQLException + */ + public Connection getConnection(ConnectionProperties properties) + throws RepositoryException, SQLException; + + /** + * Closes the given connection. Classes that obtain connections through + * {@link ConnectionProvider} must never call {@link Connection#close()} + * directly. They are required to call {@link #closeConnection(Connection)} + * instead. + * + * @param connection + * SQL {@link Connection} + * + * @throws SQLException + */ + public void closeConnection(Connection connection) throws SQLException; + + /** + * Invoked when the repository is being shut down. + * + * @throws RepositoryException + */ + public void dispose() throws RepositoryException; + + /** + * Bean that holds properties necessary to create or identify a SQL + * {@link Connection}. + */ + public final static class ConnectionProperties { + private String user; + private String password; + private String url; + private String driver; + + public String getUser() { + return user; + } + + public void setUser(String user) { + this.user = user; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public String getUrl() { + return url; + } + + public void setUrl(String url) { + this.url = url; + } + + public void setDriver(String driver) { + this.driver = driver; + } + + public String getDriver() { + return driver; + } + + private boolean equals(String s1, String s2) { + return s1 == s2 || (s1 != null && s1.equals(s2)); + } + + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj instanceof ConnectionProperties == false) { + return false; + } + ConnectionProperties cp = (ConnectionProperties) obj; + return equals(user, cp.user) && equals(password, cp.password) + && equals(url, cp.url) && equals(driver, cp.driver); + + } + + private int hashCode(String s) { + return s != null ? s.hashCode() : 0; + } + + public int hashCode() { + return hashCode(user) + 37 * hashCode(password) + 373 + & hashCode(url) + 1187 * hashCode(driver); + } + } +} Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/SimplePoolingConnectionProvider.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/SimplePoolingConnectionProvider.java (revision 0) +++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/SimplePoolingConnectionProvider.java (revision 0) @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.core.persistence.bundle.util; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import javax.jcr.RepositoryException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Simple {@link ConnectionProvider} implementation that pools the database + * {@link Connection}s. This is a proof of concept implementation and should + * not be used for production. + */ +public class SimplePoolingConnectionProvider implements ConnectionProvider { + + public SimplePoolingConnectionProvider() { + + } + + public void closeConnection(Connection connection) throws SQLException { + if (connection != null) { + Pool pool = null; + synchronized(this) { + pool = (Pool) connectionToPool.remove(connection); + } + if (pool == null) { + log.warn("Trying to close connection not opened by this ConnectionManager"); + } else { + pool.returnConnection(connection); + } + } + } + + public synchronized void dispose() throws RepositoryException { + connectionToPool.clear(); + for (Iterator i = driverToPool.values().iterator(); i.hasNext(); ) { + Pool pool = (Pool) i.next(); + pool.dispose(); + } + connectionToPool.clear(); + } + + public Connection getConnection(ConnectionProperties properties) + throws RepositoryException, SQLException { + + if (properties == null) { + throw new IllegalArgumentException("Argument 'properties' may not be null."); + } + if (properties.getDriver() == null) { + throw new IllegalArgumentException("Database driver may not be null."); + } + + Pool pool; + synchronized(this) { + pool = (Pool) driverToPool.get(properties.getDriver()); + if (pool == null) { + pool = new Pool(); + driverToPool.put(properties.getDriver(), pool); + } + } + Connection c = pool.getConnection(properties); + synchronized(this) { + connectionToPool.put(c, pool); + } + return c; + } + + private int minConnections = 10; + private int maxConnections = 20; + private boolean block = true; + + /** + * Sets the maximal amount of pooled connections. If there is demand for more connections, + * the manager will either block (if {@link #setBlock(boolean)} is true) or throw an exception + * (if {@link #setBlock(boolean)} is false). + * + * @param maxConnections + */ + public void setMaxConnections(int maxConnections) { + this.maxConnections = maxConnections; + } + + /** + * Returns the maximal amount of pooled connections. + * @return + */ + public int getMaxConnections() { + return maxConnections; + } + + /** + * Returns the minimal amount of pooled connections. + * @return + */ + public int getMinConnections() { + return minConnections; + } + + /** + * Sets the minimal amount of pooled connections. If connection is returned to the pool + * and there are more connections in pool than minimal amount of pooled connection, + * the last accessed avilable connection will be removed. + * @param minConnections + */ + public void setMinConnections(int minConnections) { + this.minConnections = minConnections; + } + + /** + * Sets the blocking behavior when there is no available connection in pool. + * If block is true, the requesting thread will be blocked, otherwise + * if block is false, and exception will be thrown. + * @param block + */ + public void setBlock(boolean block) { + this.block = block; + } + + public boolean isBlock() { + return block; + } + + private Map driverToPool = new HashMap(); + private Map connectionToPool = new HashMap(); + + private class Pool { + + private Pool() { + + } + + private final List entries = new ArrayList(); + + private class ConnectionEntry { + + private ConnectionEntry(Connection connection, + ConnectionProperties connectionProperties, boolean available) { + this.connection = connection; + this.connectionProperties = connectionProperties; + this.available = available; + touch(); + } + + private final Connection connection; + private final ConnectionProperties connectionProperties; + private boolean available; + + public Connection getConnection() { + return connection; + } + + public ConnectionProperties getConnectionProperties() { + return connectionProperties; + } + + public void setAvailable(boolean available) { + this.available = available; + } + + public boolean isAvailable() { + return available; + } + + public void touch() { + lastAccessed = System.currentTimeMillis(); + } + + public long getLastAccessed() { + return lastAccessed; + } + + private long lastAccessed; + }; + + private Connection tryGetConnection(ConnectionProperties properties) throws RepositoryException, SQLException { + synchronized (entries) { + Iterator i = entries.iterator(); + while (i.hasNext()) { + ConnectionEntry e = (ConnectionEntry) i.next(); + if (e.isAvailable() + && e.getConnectionProperties().equals(properties)) { + e.setAvailable(false); + e.touch(); + return e.getConnection(); + } + } + + if (entries.size() >= maxConnections) { + // try remove at least one connection + removeOldestAvailableEntry(); + } + + if (entries.size() >= maxConnections) { + if (!isBlock()) { + throw new RepositoryException( + "Couldn't get any more database connections."); + } else { + return null; + } + } + + Connection connection = ConnectionFactory.getConnection( + properties.getDriver(), properties.getUrl(), properties + .getUser(), properties.getPassword()); + ConnectionEntry entry = new ConnectionEntry(connection, properties, false); + entries.add(entry); + return connection; + } + } + + private Connection getConnection(ConnectionProperties properties) throws RepositoryException, SQLException { + Connection connection = null; + while (connection == null) { + connection = tryGetConnection(properties); + if (connection == null) { + try { + final int sleep = 500; + log.info("No available connections in pool, waiting " + sleep + " ms"); + Thread.sleep(sleep); + } catch (InterruptedException ignore) { + + } + } + } + return connection; + } + + private void removeOldestAvailableEntry() { + ConnectionEntry oldest = null; + + for (Iterator i = entries.iterator(); i.hasNext();) { + ConnectionEntry entry = (ConnectionEntry) i.next(); + if (entry.isAvailable() && (oldest == null || oldest.getLastAccessed() > entry.getLastAccessed())) { + oldest = entry; + } + } + + if (oldest != null) { + entries.remove(oldest); + try { + oldest.getConnection().close(); + } catch (SQLException e) { + log.error("Error closing connection", e); + } + } + } + + private void returnConnection(Connection connection) { + try { + connection.rollback(); + connection.setAutoCommit(true); + } catch (SQLException e) { + log.error("Error rollbacking connection", e); + } finally { + synchronized(entries) { + for (Iterator i = entries.iterator(); i.hasNext();) { + ConnectionEntry entry = (ConnectionEntry) i.next(); + if (entry.connection == connection) { + entry.setAvailable(true); + if (entries.size() > getMinConnections()) { + removeOldestAvailableEntry(); + } + return; + } + } + } + } + } + + private void dispose() { + for (Iterator i = entries.iterator(); i.hasNext();) { + ConnectionEntry entry = (ConnectionEntry) i.next(); + try { + entry.getConnection().close(); + } catch (SQLException e) { + log.error("Error closing connection ", e); + } + } + } + }; + + private static final Logger log = LoggerFactory + .getLogger(SimplePoolingConnectionProvider.class); +} Index: src/main/java/org/apache/jackrabbit/core/persistence/PMContext.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/persistence/PMContext.java (revision 676201) +++ src/main/java/org/apache/jackrabbit/core/persistence/PMContext.java (working copy) @@ -16,13 +16,15 @@ */ package org.apache.jackrabbit.core.persistence; +import java.io.File; + +import javax.jcr.NamespaceRegistry; + +import org.apache.jackrabbit.core.NodeId; import org.apache.jackrabbit.core.data.DataStore; import org.apache.jackrabbit.core.fs.FileSystem; import org.apache.jackrabbit.core.nodetype.NodeTypeRegistry; -import org.apache.jackrabbit.core.NodeId; - -import javax.jcr.NamespaceRegistry; -import java.io.File; +import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionProvider; /** * A PMContext is used to provide context information for a @@ -61,6 +63,8 @@ * Data store for binary properties. */ private final DataStore dataStore; + + private final ConnectionProvider connectionProvider; /** * Creates a new PMContext. @@ -76,7 +80,8 @@ NodeId rootNodeId, NamespaceRegistry nsReg, NodeTypeRegistry ntReg, - DataStore dataStore) { + DataStore dataStore, + ConnectionProvider connectionProvider) { this.physicalHomeDir = homeDir; this.fs = fs; this.rootNodeId = rootNodeId; @@ -83,6 +88,7 @@ this.nsReg = nsReg; this.ntReg = ntReg; this.dataStore = dataStore; + this.connectionProvider = connectionProvider; } @@ -136,4 +142,8 @@ public DataStore getDataStore() { return dataStore; } + + public ConnectionProvider getConnectionProvider() { + return connectionProvider; + } } Index: src/main/java/org/apache/jackrabbit/core/RepositoryImpl.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/RepositoryImpl.java (revision 676201) +++ src/main/java/org/apache/jackrabbit/core/RepositoryImpl.java (working copy) @@ -16,10 +16,35 @@ */ package org.apache.jackrabbit.core; -import EDU.oswego.cs.dl.util.concurrent.Mutex; -import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock; -import EDU.oswego.cs.dl.util.concurrent.ReentrantWriterPreferenceReadWriteLock; -import EDU.oswego.cs.dl.util.concurrent.WriterPreferenceReadWriteLock; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.security.AccessControlContext; +import java.security.AccessController; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; +import java.util.Set; + +import javax.jcr.AccessDeniedException; +import javax.jcr.Credentials; +import javax.jcr.LoginException; +import javax.jcr.NamespaceRegistry; +import javax.jcr.NoSuchWorkspaceException; +import javax.jcr.RepositoryException; +import javax.jcr.Session; +import javax.jcr.observation.Event; +import javax.jcr.observation.EventIterator; +import javax.jcr.observation.EventListener; +import javax.jcr.observation.ObservationManager; +import javax.security.auth.Subject; + import org.apache.commons.collections.map.ReferenceMap; import org.apache.commons.io.IOUtils; import org.apache.jackrabbit.api.JackrabbitRepository; @@ -31,6 +56,7 @@ import org.apache.jackrabbit.core.cluster.UpdateEventChannel; import org.apache.jackrabbit.core.cluster.UpdateEventListener; import org.apache.jackrabbit.core.config.ClusterConfig; +import org.apache.jackrabbit.core.config.ConnectionProviderConfig; import org.apache.jackrabbit.core.config.DataStoreConfig; import org.apache.jackrabbit.core.config.FileSystemConfig; import org.apache.jackrabbit.core.config.PersistenceManagerConfig; @@ -52,6 +78,8 @@ import org.apache.jackrabbit.core.observation.ObservationDispatcher; import org.apache.jackrabbit.core.persistence.PMContext; import org.apache.jackrabbit.core.persistence.PersistenceManager; +import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionProvider; +import org.apache.jackrabbit.core.persistence.bundle.util.SimplePoolingConnectionProvider; import org.apache.jackrabbit.core.security.JackrabbitSecurityManager; import org.apache.jackrabbit.core.security.authentication.AuthContext; import org.apache.jackrabbit.core.state.CacheManager; @@ -71,33 +99,10 @@ import org.slf4j.LoggerFactory; import org.xml.sax.InputSource; -import javax.jcr.AccessDeniedException; -import javax.jcr.Credentials; -import javax.jcr.LoginException; -import javax.jcr.NamespaceRegistry; -import javax.jcr.NoSuchWorkspaceException; -import javax.jcr.RepositoryException; -import javax.jcr.Session; -import javax.jcr.observation.Event; -import javax.jcr.observation.EventIterator; -import javax.jcr.observation.EventListener; -import javax.jcr.observation.ObservationManager; -import javax.security.auth.Subject; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.OutputStream; -import java.io.OutputStreamWriter; -import java.security.AccessControlContext; -import java.security.AccessController; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Properties; -import java.util.Set; +import EDU.oswego.cs.dl.util.concurrent.Mutex; +import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock; +import EDU.oswego.cs.dl.util.concurrent.ReentrantWriterPreferenceReadWriteLock; +import EDU.oswego.cs.dl.util.concurrent.WriterPreferenceReadWriteLock; /** * A RepositoryImpl ... @@ -429,7 +434,8 @@ rootNodeId, nsReg, ntReg, - dataStore); + dataStore, + getConnectionProvider()); ISMLocking ismLocking = vConfig.getISMLockingConfig().createISMLocking(); @@ -1135,6 +1141,14 @@ // finally release repository lock repLock.release(); + + if (connectionProvider != null) { + try { + connectionProvider.dispose(); + } catch (Exception e) { + log.error("Error while disposing ConnectionProvider.", e); + } + } log.info("Repository has been shutdown"); } @@ -1271,11 +1285,12 @@ NodeId rootNodeId, NamespaceRegistry nsReg, NodeTypeRegistry ntReg, - DataStore dataStore) + DataStore dataStore, + ConnectionProvider connectionProvider) throws RepositoryException { try { PersistenceManager pm = (PersistenceManager) pmConfig.newInstance(); - pm.init(new PMContext(homeDir, fs, rootNodeId, nsReg, ntReg, dataStore)); + pm.init(new PMContext(homeDir, fs, rootNodeId, nsReg, ntReg, dataStore, connectionProvider)); return pm; } catch (Exception e) { String msg = "Cannot instantiate persistence manager " + pmConfig.getClassName(); @@ -1844,7 +1859,8 @@ rootNodeId, nsReg, ntReg, - dataStore); + dataStore, + getConnectionProvider()); ISMLocking ismLocking = config.getISMLockingConfig().createISMLocking(); @@ -2176,4 +2192,40 @@ return RepositoryImpl.this.getDataStore(); } } + + private ConnectionProvider connectionProvider; + + protected ConnectionProvider createDefaultConnectionProvider() throws RepositoryException { + return new SimplePoolingConnectionProvider(); + } + + /** + * Creates a new connection provider. If there is a ConnectionProvider element + * in the configuration file it will be used. Otherwise default {@link ConnectionProvider} + * will be created. + * + * @return + * @throws RepositoryException + */ + protected ConnectionProvider createConnectionProvider() throws RepositoryException { + ConnectionProviderConfig config = getConfig().getConnectionProviderConfig(); + if (config != null) { + return config.createConnectionProvider(); + } else { + return createDefaultConnectionProvider(); + } + } + + /** + * Returns the {@link ConnectionProvider} for this repository + * + * @return + * @throws RepositoryException + */ + public ConnectionProvider getConnectionProvider() throws RepositoryException { + if (connectionProvider == null) { + connectionProvider = createConnectionProvider(); + } + return connectionProvider; + }; } Index: src/main/java/org/apache/jackrabbit/core/state/PMContext.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/state/PMContext.java (revision 676201) +++ src/main/java/org/apache/jackrabbit/core/state/PMContext.java (working copy) @@ -16,6 +16,10 @@ */ package org.apache.jackrabbit.core.state; +import java.io.File; + +import javax.jcr.NamespaceRegistry; + import org.apache.jackrabbit.core.NodeId; import org.apache.jackrabbit.core.data.DataStore; import org.apache.jackrabbit.core.fs.FileSystem; @@ -20,9 +24,7 @@ import org.apache.jackrabbit.core.data.DataStore; import org.apache.jackrabbit.core.fs.FileSystem; import org.apache.jackrabbit.core.nodetype.NodeTypeRegistry; - -import javax.jcr.NamespaceRegistry; -import java.io.File; +import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionProvider; /** * Legacy class kept for backward compatibility reasons. @@ -32,7 +34,7 @@ public class PMContext extends org.apache.jackrabbit.core.persistence.PMContext { public PMContext(File homeDir, FileSystem fs, NodeId rootNodeId, - NamespaceRegistry nsReg, NodeTypeRegistry ntReg, DataStore dataStore) { - super(homeDir, fs, rootNodeId, nsReg, ntReg, dataStore); + NamespaceRegistry nsReg, NodeTypeRegistry ntReg, DataStore dataStore, ConnectionProvider connectionProvider) { + super(homeDir, fs, rootNodeId, nsReg, ntReg, dataStore, connectionProvider); } }