Index: src/main/java/org/apache/jackrabbit/core/config/FileSystemConfig.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/config/FileSystemConfig.java (revision 676201) +++ src/main/java/org/apache/jackrabbit/core/config/FileSystemConfig.java (working copy) @@ -17,6 +17,7 @@ package org.apache.jackrabbit.core.config; import org.apache.jackrabbit.core.fs.FileSystem; +import org.apache.jackrabbit.core.fs.FileSystemContext; import org.apache.jackrabbit.core.fs.FileSystemException; /** @@ -41,10 +42,10 @@ * @return new initialized file system instance. * @throws ConfigurationException on file system initialization errors */ - public FileSystem createFileSystem() throws ConfigurationException { + public FileSystem createFileSystem(FileSystemContext context) throws ConfigurationException { try { FileSystem fs = (FileSystem) newInstance(); - fs.init(); + fs.init(context); return fs; } catch (ClassCastException e) { throw new ConfigurationException( Index: src/main/java/org/apache/jackrabbit/core/config/RepositoryConfig.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/config/RepositoryConfig.java (revision 676201) +++ src/main/java/org/apache/jackrabbit/core/config/RepositoryConfig.java (working copy) @@ -16,22 +16,6 @@ */ package org.apache.jackrabbit.core.config; -import org.apache.commons.io.IOUtils; -import org.apache.jackrabbit.core.fs.FileSystem; -import org.apache.jackrabbit.core.fs.FileSystemException; -import org.apache.jackrabbit.core.fs.FileSystemPathUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.w3c.dom.Element; -import org.xml.sax.InputSource; - -import javax.xml.transform.OutputKeys; -import javax.xml.transform.Transformer; -import javax.xml.transform.TransformerConfigurationException; -import javax.xml.transform.TransformerException; -import javax.xml.transform.TransformerFactory; -import javax.xml.transform.dom.DOMSource; -import javax.xml.transform.stream.StreamResult; import java.io.File; import java.io.FileNotFoundException; import java.io.FileReader; @@ -48,6 +32,27 @@ import java.util.Map; import java.util.Properties; +import javax.jcr.RepositoryException; +import javax.xml.transform.OutputKeys; +import javax.xml.transform.Transformer; +import javax.xml.transform.TransformerConfigurationException; +import javax.xml.transform.TransformerException; +import javax.xml.transform.TransformerFactory; +import javax.xml.transform.dom.DOMSource; +import javax.xml.transform.stream.StreamResult; + +import org.apache.commons.io.IOUtils; +import org.apache.jackrabbit.core.fs.FileSystem; +import org.apache.jackrabbit.core.fs.FileSystemContext; +import org.apache.jackrabbit.core.fs.FileSystemException; +import org.apache.jackrabbit.core.fs.FileSystemPathUtil; +import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionProvider; +import org.apache.jackrabbit.core.persistence.bundle.util.SimplePoolingConnectionProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.w3c.dom.Element; +import org.xml.sax.InputSource; + /** * Repository configuration. This configuration class is used to * create configured repository objects. @@ -224,6 +229,11 @@ private final DataStoreConfig dataStoreConfig; /** + * Optional connection provider class. + */ + private final ConnectionProviderConfig connectionProviderConfig; + + /** * Creates a repository configuration object. * * @param home repository home directory @@ -244,7 +254,8 @@ String workspaceDirectory, String workspaceConfigDirectory, String defaultWorkspace, int workspaceMaxIdleTime, Element template, VersioningConfig vc, SearchConfig sc, - ClusterConfig cc, DataStoreConfig dataStoreConfig, RepositoryConfigurationParser parser) { + ClusterConfig cc, DataStoreConfig dataStoreConfig, + ConnectionProviderConfig connectionProviderConfig, RepositoryConfigurationParser parser) { workspaces = new HashMap(); this.home = home; this.sec = sec; @@ -258,6 +269,7 @@ this.sc = sc; this.cc = cc; this.dataStoreConfig = dataStoreConfig; + this.connectionProviderConfig = connectionProviderConfig; this.parser = parser; } @@ -286,7 +298,7 @@ // a configuration directoy had been specified; search for // workspace configurations in virtual repository file system // rather than in physical workspace root directory on disk - FileSystem fs = fsc.createFileSystem(); + FileSystem fs = fsc.createFileSystem(new FileSystemContext(getConnectionProvider())); try { if (!fs.exists(workspaceConfigDirectory)) { fs.createFolder(workspaceConfigDirectory); @@ -482,7 +494,7 @@ // a configuration directoy had been specified; // workspace configurations are maintained in // virtual repository file system - virtualFS = fsc.createFileSystem(); + virtualFS = fsc.createFileSystem(new FileSystemContext(getConnectionProvider())); } else { // workspace configurations are maintained on disk virtualFS = null; @@ -753,6 +765,65 @@ public DataStoreConfig getDataStoreConfig() { return dataStoreConfig; } + + /** + * Returns configured {@link ConnectionProviderConfig} + * + * @return {@link ConnectionProviderConfig} instance or null + * if no provider has been configured. + */ + public ConnectionProviderConfig getConnectionProviderConfig() { + return connectionProviderConfig; + } + + private ConnectionProvider connectionProvider; + + protected ConnectionProvider createDefaultConnectionProvider() { + return new SimplePoolingConnectionProvider(); + } + + /** + * Creates a new connection provider. If there is a ConnectionProvider element + * in the configuration file it will be used. Otherwise default {@link ConnectionProvider} + * will be created. + * + * @return + * @throws ConfigurationException + * @throws RepositoryException + */ + protected ConnectionProvider createConnectionProvider() throws ConfigurationException { + ConnectionProviderConfig config = getConnectionProviderConfig(); + ConnectionProvider provider; + if (config != null) { + provider = config.createConnectionProvider(); + } else { + provider = createDefaultConnectionProvider(); + } + return provider; + } + + /** + * Returns the {@link ConnectionProvider} for this repository + * + * @return + * @throws RepositoryException + */ + public ConnectionProvider getConnectionProvider() throws ConfigurationException { + if (connectionProvider == null) { + connectionProvider = createConnectionProvider(); + } + return connectionProvider; + }; + + public void dispose() { + if (connectionProvider != null) { + try { + connectionProvider.dispose(); + } catch (Exception e) { + log.error("Error while disposing ConnectionProvider.", e); + } + } + } } Index: src/main/java/org/apache/jackrabbit/core/config/RepositoryConfigurationParser.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/config/RepositoryConfigurationParser.java (revision 676201) +++ src/main/java/org/apache/jackrabbit/core/config/RepositoryConfigurationParser.java (working copy) @@ -16,6 +16,9 @@ */ package org.apache.jackrabbit.core.config; +import java.io.File; +import java.util.Properties; + import org.w3c.dom.Element; import org.w3c.dom.Node; import org.w3c.dom.NodeList; @@ -21,9 +24,6 @@ import org.w3c.dom.NodeList; import org.xml.sax.InputSource; -import java.io.File; -import java.util.Properties; - /** * Configuration parser. This class is used to parse the repository and * workspace configuration files. @@ -146,6 +146,12 @@ private static final String AC_PROVIDER_ELEMENT = "AccessControlProvider"; /** + * Name of optional ConnectionProvider element for configuring custom + * ConnectionProvider implementations. + */ + private static final String CONNECTION_PROVIDER_ELEMENT = "ConnectionProvider"; + + /** * Creates a new configuration parser with the given parser variables. * * @param variables parser variables @@ -248,9 +254,15 @@ // Optional data store configuration DataStoreConfig dsc = parseDataStoreConfig(root); + Element connectionProviderElement = getElement(root, CONNECTION_PROVIDER_ELEMENT, false); + ConnectionProviderConfig cpc = null; + if (connectionProviderElement != null) { + cpc = new ConnectionProviderConfig(parseBeanConfig(root, CONNECTION_PROVIDER_ELEMENT)); + } + return new RepositoryConfig(home, securityConfig, fsc, workspaceDirectory, workspaceConfigDirectory, defaultWorkspace, - maxIdleTime, template, vc, sc, cc, dsc, this); + maxIdleTime, template, vc, sc, cc, dsc, cpc, this); } /** @@ -507,7 +519,7 @@ Element element = getElement(parent, WSP_SECURITY_ELEMENT, false); if (element != null) { Element provFact = getElement(element, AC_PROVIDER_ELEMENT, false); - if (provFact !=null ) { + if (provFact != null) { factConf = parseBeanConfig(element, AC_PROVIDER_ELEMENT); } } Index: src/main/java/org/apache/jackrabbit/core/config/ConnectionProviderConfig.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/config/ConnectionProviderConfig.java (revision 0) +++ src/main/java/org/apache/jackrabbit/core/config/ConnectionProviderConfig.java (revision 0) @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.core.config; + +import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionProvider; + +/** + * BeanConfig for {@link ConnectionProvider} configuration. + */ +public class ConnectionProviderConfig extends BeanConfig { + + public ConnectionProviderConfig(BeanConfig config) { + super(config); + } + + /** + * Instantiates and initializes the configured connection provider + * implementation class. + * + * @return new initialized connection provider instance. + * @throws ConfigurationException on file system initialization errors + */ + public ConnectionProvider createConnectionProvider() throws ConfigurationException { + try { + ConnectionProvider cp = (ConnectionProvider) newInstance(); + return cp; + } catch (ClassCastException e) { + throw new ConfigurationException( + "Invalid file system implementation class " + + getClassName() + ".", e); + } + } +} Index: src/main/java/org/apache/jackrabbit/core/data/DataStore.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/data/DataStore.java (revision 676201) +++ src/main/java/org/apache/jackrabbit/core/data/DataStore.java (working copy) @@ -102,10 +102,10 @@ /** * Initialized the data store * - * @param homeDir the home directory of the repository + * @param context DataStoreContext * @throws RepositoryException */ - void init(String homeDir) throws RepositoryException; + void init(DataStoreContext context) throws RepositoryException; /** * Get the minimum size of an object that should be stored in this data store. Index: src/main/java/org/apache/jackrabbit/core/data/db/DbDataStore.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/data/db/DbDataStore.java (revision 678172) +++ src/main/java/org/apache/jackrabbit/core/data/db/DbDataStore.java (working copy) @@ -16,18 +16,6 @@ */ package org.apache.jackrabbit.core.data.db; -import org.apache.jackrabbit.core.data.DataIdentifier; -import org.apache.jackrabbit.core.data.DataRecord; -import org.apache.jackrabbit.core.data.DataStore; -import org.apache.jackrabbit.core.data.DataStoreException; -import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionRecoveryManager; -import org.apache.jackrabbit.core.persistence.bundle.util.TrackingInputStream; -import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionRecoveryManager.StreamWrapper; -import org.apache.jackrabbit.util.Text; -import org.apache.jackrabbit.uuid.UUID; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.BufferedInputStream; import java.io.ByteArrayInputStream; import java.io.File; @@ -37,9 +25,11 @@ import java.security.DigestInputStream; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; +import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.PreparedStatement; import java.sql.ResultSet; +import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; @@ -49,6 +39,21 @@ import javax.jcr.RepositoryException; +import org.apache.jackrabbit.core.data.DataIdentifier; +import org.apache.jackrabbit.core.data.DataRecord; +import org.apache.jackrabbit.core.data.DataStore; +import org.apache.jackrabbit.core.data.DataStoreContext; +import org.apache.jackrabbit.core.data.DataStoreException; +import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionManager; +import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionRecoveryManager; +import org.apache.jackrabbit.core.persistence.bundle.util.StreamWrapper; +import org.apache.jackrabbit.core.persistence.bundle.util.TrackingInputStream; +import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionProvider.ConnectionProperties; +import org.apache.jackrabbit.util.Text; +import org.apache.jackrabbit.uuid.UUID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * A data store implementation that stores the records in a database using JDBC. * @@ -61,7 +66,6 @@ * <param name="{@link #setDatabaseType(String) databaseType}" value="postgresql"/> * <param name="{@link #setDriver(String) driver}" value="org.postgresql.Driver"/> * <param name="{@link #setMinRecordLength(int) minRecordLength}" value="1024"/> - * <param name="{@link #setMaxConnections(int) maxConnections}" value="2"/> * <param name="{@link #setCopyWhenReading(int) copyWhenReading}" value="true"/> * <param name="{@link #setTablePrefix(int) tablePrefix}" value=""/> * </DataStore> @@ -149,14 +153,9 @@ protected int minRecordLength = DEFAULT_MIN_RECORD_LENGTH; /** - * The maximum number of open connections. - */ - protected int maxConnections = DEFAULT_MAX_CONNECTIONS; - - /** - * A list of connections + * Connection Manager */ - protected Pool connectionPool; + protected ConnectionManager connectionManager; /** * The prefix used for temporary objects. @@ -283,9 +282,10 @@ public DataRecord addRecord(InputStream stream) throws DataStoreException { ResultSet rs = null; TempFileInputStream fileInput = null; - ConnectionRecoveryManager conn = getConnection(); + Connection connection = null; try { - conn.setAutoReconnect(false); + connection = getConnection(); + String id = null, tempId = null; long now; for (int i = 0; i < ConnectionRecoveryManager.TRIALS; i++) { @@ -293,7 +293,7 @@ now = System.currentTimeMillis(); id = UUID.randomUUID().toString(); tempId = TEMP_PREFIX + id; - PreparedStatement prep = conn.executeStmt(selectMetaSQL, new Object[]{tempId}); + PreparedStatement prep = connectionManager.executeStmt(connection, selectMetaSQL, new Object[]{tempId}); rs = prep.getResultSet(); if (rs.next()) { // re-try in the very, very unlikely event that the row already exists @@ -299,7 +299,7 @@ // re-try in the very, very unlikely event that the row already exists continue; } - conn.executeStmt(insertTempSQL, new Object[]{tempId, new Long(now)}); + connectionManager.executeStmt(connection, insertTempSQL, new Object[]{tempId, new Long(now)}); break; } catch (Exception e) { throw convert("Can not insert new record", e); @@ -326,7 +326,7 @@ } else { throw new DataStoreException("Unsupported stream store algorithm: " + storeStream); } - conn.executeStmt(updateDataSQL, new Object[]{wrapper, tempId}); + connectionManager.executeStmt(connection, updateDataSQL, new Object[]{wrapper, tempId}); now = System.currentTimeMillis(); long length = in.getPosition(); DataIdentifier identifier = new DataIdentifier(digest.digest()); @@ -335,7 +335,7 @@ // UPDATE DATASTORE SET ID=?, LENGTH=?, LAST_MODIFIED=? // WHERE ID=? // AND NOT EXISTS(SELECT ID FROM DATASTORE WHERE ID=?) - PreparedStatement prep = conn.executeStmt(updateSQL, new Object[]{ + PreparedStatement prep = connectionManager.executeStmt(connection, updateSQL, new Object[]{ id, new Long(length), new Long(now), tempId, id}); int count = prep.getUpdateCount(); @@ -342,9 +342,9 @@ if (count == 0) { // update count is 0, meaning such a row already exists // DELETE FROM DATASTORE WHERE ID=? - conn.executeStmt(deleteSQL, new Object[]{tempId}); + connectionManager.executeStmt(connection, deleteSQL, new Object[]{tempId}); // SELECT LENGTH, LAST_MODIFIED FROM DATASTORE WHERE ID=? - prep = conn.executeStmt(selectMetaSQL, new Object[]{id}); + prep = connectionManager.executeStmt(connection, selectMetaSQL, new Object[]{id}); rs = prep.getResultSet(); if (rs.next()) { long oldLength = rs.getLong(1); @@ -362,7 +362,7 @@ } usesIdentifier(identifier); DbDataRecord record = new DbDataRecord(this, identifier, length, now); - conn.setAutoReconnect(true); + return record; } catch (Exception e) { throw convert("Can not insert new record", e); @@ -367,8 +367,8 @@ } catch (Exception e) { throw convert("Can not insert new record", e); } finally { - conn.closeSilently(rs); - putBack(conn); + closeSilently(rs); + putBack(connection); if (fileInput != null) { try { fileInput.close(); @@ -397,8 +397,9 @@ * {@inheritDoc} */ public synchronized int deleteAllOlderThan(long min) throws DataStoreException { - ConnectionRecoveryManager conn = getConnection(); + Connection connection = null; try { + connection = getConnection(); Iterator it = new ArrayList(inUse.keySet()).iterator(); while (it.hasNext()) { DataIdentifier identifier = (DataIdentifier) it.next(); @@ -407,7 +408,7 @@ } } // DELETE FROM DATASTORE WHERE LAST_MODIFIED 0); - } catch (SQLException e) { - String msg = "failed to determine child count of file system entry: " + path; - log.error(msg, e); - throw new FileSystemException(msg, e); - } finally { - closeResultSet(rs); + Connection connection = null; + ResultSet rs = null; + + try { + connection = connectionManager.getConnection(); + Statement stmt = connectionManager.executeStmt( + connection, selectChildCountSQL, new Object[]{path}); + + rs = stmt.getResultSet(); + if (!rs.next()) { + return false; + } + int count = rs.getInt(1); + if (FileSystemPathUtil.denotesRoot(path)) { + // ingore file system root entry + count--; } + return (count > 0); + } catch (Exception e) { + String msg = "failed to determine child count of file system entry: " + path; + log.error(msg, e); + throw new FileSystemException(msg, e); + } finally { + closeResultSet(rs); + connectionManager.close(connection); } } @@ -534,30 +530,33 @@ throw new FileSystemException("no such folder: " + folderPath); } - synchronized (selectFileAndFolderNamesSQL) { - ResultSet rs = null; - try { - Statement stmt = executeStmt( - selectFileAndFolderNamesSQL, new Object[]{folderPath}); - rs = stmt.getResultSet(); - ArrayList names = new ArrayList(); - while (rs.next()) { - String name = rs.getString(1); - if (name.length() == 0 - && FileSystemPathUtil.denotesRoot(folderPath)) { - // this is the file system root entry, skip... - continue; - } - names.add(name); + Connection connection = null; + ResultSet rs = null; + + try { + connection = connectionManager.getConnection(); + + Statement stmt = connectionManager.executeStmt( + connection, selectFileAndFolderNamesSQL, new Object[]{folderPath}); + rs = stmt.getResultSet(); + ArrayList names = new ArrayList(); + while (rs.next()) { + String name = rs.getString(1); + if (name.length() == 0 + && FileSystemPathUtil.denotesRoot(folderPath)) { + // this is the file system root entry, skip... + continue; } - return (String[]) names.toArray(new String[names.size()]); - } catch (SQLException e) { - String msg = "failed to list child entries of folder: " + folderPath; - log.error(msg, e); - throw new FileSystemException(msg, e); - } finally { - closeResultSet(rs); + names.add(name); } + return (String[]) names.toArray(new String[names.size()]); + } catch (Exception e) { + String msg = "failed to list child entries of folder: " + folderPath; + log.error(msg, e); + throw new FileSystemException(msg, e); + } finally { + closeResultSet(rs); + connectionManager.close(connection); } } @@ -575,24 +574,27 @@ throw new FileSystemException("no such folder: " + folderPath); } - synchronized (selectFileNamesSQL) { - ResultSet rs = null; - try { - Statement stmt = executeStmt( - selectFileNamesSQL, new Object[]{folderPath}); - rs = stmt.getResultSet(); - ArrayList names = new ArrayList(); - while (rs.next()) { - names.add(rs.getString(1)); - } - return (String[]) names.toArray(new String[names.size()]); - } catch (SQLException e) { - String msg = "failed to list file entries of folder: " + folderPath; - log.error(msg, e); - throw new FileSystemException(msg, e); - } finally { - closeResultSet(rs); + Connection connection = null; + ResultSet rs = null; + + try { + connection = connectionManager.getConnection(); + + Statement stmt = connectionManager.executeStmt( + connection, selectFileNamesSQL, new Object[]{folderPath}); + rs = stmt.getResultSet(); + ArrayList names = new ArrayList(); + while (rs.next()) { + names.add(rs.getString(1)); } + return (String[]) names.toArray(new String[names.size()]); + } catch (Exception e) { + String msg = "failed to list file entries of folder: " + folderPath; + log.error(msg, e); + throw new FileSystemException(msg, e); + } finally { + closeResultSet(rs); + connectionManager.close(connection); } } @@ -610,30 +612,33 @@ throw new FileSystemException("no such folder: " + folderPath); } - synchronized (selectFolderNamesSQL) { - ResultSet rs = null; - try { - Statement stmt = executeStmt( - selectFolderNamesSQL, new Object[]{folderPath}); - rs = stmt.getResultSet(); - ArrayList names = new ArrayList(); - while (rs.next()) { - String name = rs.getString(1); - if (name.length() == 0 - && FileSystemPathUtil.denotesRoot(folderPath)) { - // this is the file system root entry, skip... - continue; - } - names.add(name); + Connection connection = null; + ResultSet rs = null; + + try { + connection = connectionManager.getConnection(); + + Statement stmt = connectionManager.executeStmt( + connection, selectFolderNamesSQL, new Object[]{folderPath}); + rs = stmt.getResultSet(); + ArrayList names = new ArrayList(); + while (rs.next()) { + String name = rs.getString(1); + if (name.length() == 0 + && FileSystemPathUtil.denotesRoot(folderPath)) { + // this is the file system root entry, skip... + continue; } - return (String[]) names.toArray(new String[names.size()]); - } catch (SQLException e) { - String msg = "failed to list folder entries of folder: " + folderPath; - log.error(msg, e); - throw new FileSystemException(msg, e); - } finally { - closeResultSet(rs); + names.add(name); } + return (String[]) names.toArray(new String[names.size()]); + } catch (Exception e) { + String msg = "failed to list folder entries of folder: " + folderPath; + log.error(msg, e); + throw new FileSystemException(msg, e); + } finally { + closeResultSet(rs); + connectionManager.close(connection); } } @@ -651,18 +656,21 @@ String name = FileSystemPathUtil.getName(filePath); int count = 0; - synchronized (updateLastModifiedSQL) { - try { - Statement stmt = executeStmt(updateLastModifiedSQL, new Object[]{ - new Long(System.currentTimeMillis()), - parentDir, - name}); - count = stmt.getUpdateCount(); - } catch (SQLException e) { - String msg = "failed to touch file: " + filePath; - log.error(msg, e); - throw new FileSystemException(msg, e); - } + Connection connection = null; + + try { + connection = connectionManager.getConnection(); + Statement stmt = connectionManager.executeStmt(connection, updateLastModifiedSQL, new Object[]{ + new Long(System.currentTimeMillis()), + parentDir, + name}); + count = stmt.getUpdateCount(); + } catch (Exception e) { + String msg = "failed to touch file: " + filePath; + log.error(msg, e); + throw new FileSystemException(msg, e); + } finally { + connectionManager.close(connection); } if (count == 0) { @@ -683,32 +691,47 @@ String parentDir = FileSystemPathUtil.getParentDir(filePath); String name = FileSystemPathUtil.getName(filePath); - synchronized (selectDataSQL) { - try { - Statement stmt = executeStmt( - selectDataSQL, new Object[]{parentDir, name}); + Connection connection = null; + ResultSet rs = null; + + try { + connection = connectionManager.getConnection(); + + Statement stmt = connectionManager.executeStmt( + connection, selectDataSQL, new Object[]{parentDir, name}); - final ResultSet rs = stmt.getResultSet(); - if (!rs.next()) { - throw new FileSystemException("no such file: " + filePath); - } - InputStream in = rs.getBinaryStream(1); - /** - * return an InputStream wrapper in order to - * close the ResultSet when the stream is closed - */ - return new FilterInputStream(in) { - public void close() throws IOException { - super.close(); - // close ResultSet - closeResultSet(rs); - } - }; - } catch (SQLException e) { - String msg = "failed to retrieve data of file: " + filePath; - log.error(msg, e); - throw new FileSystemException(msg, e); + rs = stmt.getResultSet(); + if (!rs.next()) { + throw new FileSystemException("no such file: " + filePath); } + InputStream in = rs.getBinaryStream(1); + + // prevent closing connection and resultset in the finally block + final Connection c = connection; + final ResultSet r = rs; + connection = null; + rs = null; + + /** + * return an InputStream wrapper in order to + * close the ResultSet when the stream is closed + */ + return new FilterInputStream(in) { + public void close() throws IOException { + // close ResultSet + closeResultSet(r); + connectionManager.close(c); + + super.close(); + } + }; + } catch (Exception e) { + String msg = "failed to retrieve data of file: " + filePath; + log.error(msg, e); + throw new FileSystemException(msg, e); + } finally { + closeResultSet(rs); + connectionManager.close(connection); } } @@ -744,33 +767,34 @@ super.close(); InputStream in = null; + Connection connection = null; try { + connection = connectionManager.getConnection(); + if (isFile(filePath)) { - synchronized (updateDataSQL) { - long length = tmpFile.length(); - in = new FileInputStream(tmpFile); - executeStmt(updateDataSQL, - new Object[]{ - new SizedInputStream(in, length), - new Long(System.currentTimeMillis()), - new Long(length), - parentDir, - name - }); - } + long length = tmpFile.length(); + in = new FileInputStream(tmpFile); + connectionManager.executeStmt(connection, updateDataSQL, + new Object[]{ + new StreamWrapper(in, length), + new Long(System.currentTimeMillis()), + new Long(length), + parentDir, + name + }); + } else { - synchronized (insertFileSQL) { - long length = tmpFile.length(); - in = new FileInputStream(tmpFile); - executeStmt(insertFileSQL, - new Object[]{ - parentDir, - name, - new SizedInputStream(in, length), - new Long(System.currentTimeMillis()), - new Long(length) - }); - } + long length = tmpFile.length(); + in = new FileInputStream(tmpFile); + connectionManager.executeStmt(connection, insertFileSQL, + new Object[]{ + parentDir, + name, + new StreamWrapper(in, length), + new Long(System.currentTimeMillis()), + new Long(length) + }); + } } catch (Exception e) { @@ -778,6 +802,8 @@ ioe.initCause(e); throw ioe; } finally { + connectionManager.close(connection); + if (in != null) { in.close(); } @@ -841,33 +867,32 @@ raf.close(); InputStream in = null; + Connection connection = null; try { + connection = connectionManager.getConnection(); + if (isFile(filePath)) { - synchronized (updateDataSQL) { - long length = tmpFile.length(); - in = new FileInputStream(tmpFile); - executeStmt(updateDataSQL, - new Object[]{ - new SizedInputStream(in, length), - new Long(System.currentTimeMillis()), - new Long(length), - parentDir, - name - }); - } + long length = tmpFile.length(); + in = new FileInputStream(tmpFile); + connectionManager.executeStmt(connection, updateDataSQL, + new Object[]{ + new StreamWrapper(in, length), + new Long(System.currentTimeMillis()), + new Long(length), + parentDir, + name + }); } else { - synchronized (insertFileSQL) { - long length = tmpFile.length(); - in = new FileInputStream(tmpFile); - executeStmt(insertFileSQL, - new Object[]{ - parentDir, - name, - new SizedInputStream(in, length), - new Long(System.currentTimeMillis()), - new Long(length) - }); - } + long length = tmpFile.length(); + in = new FileInputStream(tmpFile); + connectionManager.executeStmt(connection, insertFileSQL, + new Object[]{ + parentDir, + name, + new StreamWrapper(in, length), + new Long(System.currentTimeMillis()), + new Long(length) + }); } } catch (Exception e) { @@ -875,6 +900,8 @@ ioe.initCause(e); throw ioe; } finally { + connectionManager.close(connection); + if (in != null) { in.close(); } @@ -952,159 +979,15 @@ //----------------------------------< misc. helper methods & overridables > - /** - * Initializes the database connection used by this file system. - *

- * Subclasses should normally override the {@link #getConnection()} - * method instead of this one. The default implementation calls - * {@link #getConnection()} to get the database connection and disables - * the autocommit feature. - * - * @throws Exception if an error occurs - */ - protected void initConnection() throws Exception { - con = getConnection(); - // JCR-1013: Setter may fail unnecessarily on a managed connection - if (!con.getAutoCommit()) { - con.setAutoCommit(true); - } - } - /** - * Abstract factory method for creating a new database connection. This - * method is called by {@link #initConnection()} when the file system is - * started. The returned connection should come with the default JDBC - * settings, as the {@link #initConnection()} method will explicitly set - * the autoCommit and other properties as needed. - *

- * Note that the returned database connection is kept during the entire - * lifetime of the file system, after which it is closed by - * {@link #close()} using the {@link #closeConnection(Connection)} method. - * - * @return new connection - * @throws Exception if an error occurs - */ - protected Connection getConnection() throws Exception { + protected ConnectionManager getConnectionManager(FileSystemContext context) throws Exception { throw new UnsupportedOperationException("Override in a subclass!"); } - /** - * Closes the given database connection. This method is called by - * {@link #close()} to close the connection acquired using - * {@link #getConnection()} when the file system was started. - *

- * The default implementation just calls the {@link Connection#close()} - * method of the given connection, but subclasses can override this - * method to provide more extensive database and connection cleanup. - * - * @param connection database connection - * @throws SQLException if an error occurs - */ - protected void closeConnection(Connection connection) throws SQLException { - connection.close(); - } - - /** - * Re-establishes the database connection. This method is called by - * {@link #executeStmt(String, Object[])} after a SQLException - * had been encountered. - * - * @return true if the connection could be successfully re-established, - * false otherwise. - */ - protected synchronized boolean reestablishConnection() { - // in any case try to shut down current connection - // gracefully in order to avoid potential memory leaks - - // close shared prepared statements - Iterator it = preparedStatements.values().iterator(); - while (it.hasNext()) { - closeStatement((PreparedStatement) it.next()); - } - try { - closeConnection(con); - } catch (Exception ignore) { - } - - // sleep for a while to give database a chance - // to restart before a reconnect is attempted - - try { - Thread.sleep(SLEEP_BEFORE_RECONNECT); - } catch (InterruptedException ignore) { - } - - // now try to re-establish connection - - try { - initConnection(); - initPreparedStatements(); - return true; - } catch (Exception e) { - log.error("failed to re-establish connection", e); - // reconnect failed - return false; - } - } - - /** - * Executes the given SQL statement with the specified parameters. - * If a SQLException is encountered one attempt is made - * to re-establish the database connection and re-execute the statement. - * - * @param sql statement to execute - * @param params parameters to set - * @return the Statement object that had been executed - * @throws SQLException if an error occurs - */ - protected Statement executeStmt(String sql, Object[] params) - throws SQLException { - int trials = 2; - while (true) { - PreparedStatement stmt = (PreparedStatement) preparedStatements.get(sql); - try { - for (int i = 0; i < params.length; i++) { - if (params[i] instanceof SizedInputStream) { - SizedInputStream in = (SizedInputStream) params[i]; - stmt.setBinaryStream(i + 1, in, (int) in.getSize()); - } else { - stmt.setObject(i + 1, params[i]); - } - } - stmt.execute(); - resetStatement(stmt); - return stmt; - } catch (SQLException se) { - if (--trials == 0) { - // no more trials, re-throw - throw se; - } - log.warn("execute failed, about to reconnect...", se.getMessage()); - - // try to reconnect - if (reestablishConnection()) { - // reconnect succeeded; check whether it's possible to - // re-execute the prepared stmt with the given parameters - for (int i = 0; i < params.length; i++) { - if (params[i] instanceof SizedInputStream) { - SizedInputStream in = (SizedInputStream) params[i]; - if (in.isConsumed()) { - // we're unable to re-execute the prepared stmt - // since an InputStream paramater has already - // been 'consumed'; - // re-throw previous SQLException - throw se; - } - } - } - - // try again to execute the statement - continue; - } else { - // reconnect failed, re-throw previous SQLException - throw se; - } - } + protected void initConnectionManager(FileSystemContext context) throws Exception { + connectionManager = getConnectionManager(context); + if (connectionManager == null) { + throw new FileSystemException("Failed to init ConnectionManager. ConnectionManager may not be null."); } } @@ -1116,25 +999,31 @@ * @throws Exception if an error occurs */ protected void prepareSchemaObjectPrefix() throws Exception { - DatabaseMetaData metaData = con.getMetaData(); - String legalChars = metaData.getExtraNameCharacters(); - legalChars += "ABCDEFGHIJKLMNOPQRSTUVWXZY0123456789_"; + Connection connection = null; + try { + connection = connectionManager.getConnection(); + DatabaseMetaData metaData = connection.getMetaData(); + String legalChars = metaData.getExtraNameCharacters(); + legalChars += "ABCDEFGHIJKLMNOPQRSTUVWXZY0123456789_"; - String prefix = schemaObjectPrefix.toUpperCase(); - StringBuffer escaped = new StringBuffer(); - for (int i = 0; i < prefix.length(); i++) { - char c = prefix.charAt(i); - if (legalChars.indexOf(c) == -1) { - escaped.append("_x"); - String hex = Integer.toHexString(c); - escaped.append("0000".toCharArray(), 0, 4 - hex.length()); - escaped.append(hex); - escaped.append("_"); - } else { - escaped.append(c); + String prefix = schemaObjectPrefix.toUpperCase(); + StringBuffer escaped = new StringBuffer(); + for (int i = 0; i < prefix.length(); i++) { + char c = prefix.charAt(i); + if (legalChars.indexOf(c) == -1) { + escaped.append("_x"); + String hex = Integer.toHexString(c); + escaped.append("0000".toCharArray(), 0, 4 - hex.length()); + escaped.append(hex); + escaped.append("_"); + } else { + escaped.append(c); + } } + schemaObjectPrefix = escaped.toString(); + } finally { + connectionManager.close(connection); } - schemaObjectPrefix = escaped.toString(); } /** @@ -1144,48 +1033,54 @@ * @throws Exception if an error occurs */ protected void checkSchema() throws Exception { - DatabaseMetaData metaData = con.getMetaData(); - String tableName = schemaObjectPrefix + "FSENTRY"; - if (metaData.storesLowerCaseIdentifiers()) { - tableName = tableName.toLowerCase(); - } else if (metaData.storesUpperCaseIdentifiers()) { - tableName = tableName.toUpperCase(); - } - ResultSet rs = metaData.getTables(null, null, tableName, null); - boolean schemaExists; + Connection connection = null; try { - schemaExists = rs.next(); - } finally { - rs.close(); - } - - if (!schemaExists) { - // read ddl from resources - InputStream in = DatabaseFileSystem.class.getResourceAsStream(schema + ".ddl"); - if (in == null) { - String msg = "Configuration error: unknown schema '" + schema + "'"; - log.debug(msg); - throw new RepositoryException(msg); + connection = connectionManager.getConnection(); + DatabaseMetaData metaData = connection.getMetaData(); + String tableName = schemaObjectPrefix + "FSENTRY"; + if (metaData.storesLowerCaseIdentifiers()) { + tableName = tableName.toLowerCase(); + } else if (metaData.storesUpperCaseIdentifiers()) { + tableName = tableName.toUpperCase(); } - BufferedReader reader = new BufferedReader(new InputStreamReader(in)); - Statement stmt = con.createStatement(); + ResultSet rs = metaData.getTables(null, null, tableName, null); + boolean schemaExists; try { - String sql = reader.readLine(); - while (sql != null) { - // Skip comments and empty lines - if (!sql.startsWith("#") && sql.length() > 0) { - // replace prefix variable - sql = Text.replace(sql, SCHEMA_OBJECT_PREFIX_VARIABLE, schemaObjectPrefix); - // execute sql stmt - stmt.executeUpdate(sql); + schemaExists = rs.next(); + } finally { + rs.close(); + } + + if (!schemaExists) { + // read ddl from resources + InputStream in = DatabaseFileSystem.class.getResourceAsStream(schema + ".ddl"); + if (in == null) { + String msg = "Configuration error: unknown schema '" + schema + "'"; + log.debug(msg); + throw new RepositoryException(msg); + } + BufferedReader reader = new BufferedReader(new InputStreamReader(in)); + Statement stmt = connection.createStatement(); + try { + String sql = reader.readLine(); + while (sql != null) { + // Skip comments and empty lines + if (!sql.startsWith("#") && sql.length() > 0) { + // replace prefix variable + sql = Text.replace(sql, SCHEMA_OBJECT_PREFIX_VARIABLE, schemaObjectPrefix); + // execute sql stmt + stmt.executeUpdate(sql); + } + // read next sql stmt + sql = reader.readLine(); } - // read next sql stmt - sql = reader.readLine(); + } finally { + IOUtils.closeQuietly(in); + closeStatement(stmt); } - } finally { - IOUtils.closeQuietly(in); - closeStatement(stmt); } + } finally { + connectionManager.close(connection); } } @@ -1283,51 +1178,6 @@ } /** - * Initializes the map of prepared statements. - * - * @throws SQLException if an error occurs - */ - protected void initPreparedStatements() throws SQLException { - preparedStatements.put( - selectExistSQL, con.prepareStatement(selectExistSQL)); - preparedStatements.put( - selectFileExistSQL, con.prepareStatement(selectFileExistSQL)); - preparedStatements.put( - selectFolderExistSQL, con.prepareStatement(selectFolderExistSQL)); - preparedStatements.put( - selectChildCountSQL, con.prepareStatement(selectChildCountSQL)); - preparedStatements.put( - selectDataSQL, con.prepareStatement(selectDataSQL)); - preparedStatements.put( - selectLastModifiedSQL, con.prepareStatement(selectLastModifiedSQL)); - preparedStatements.put( - selectLengthSQL, con.prepareStatement(selectLengthSQL)); - preparedStatements.put( - selectFileNamesSQL, con.prepareStatement(selectFileNamesSQL)); - preparedStatements.put( - selectFolderNamesSQL, con.prepareStatement(selectFolderNamesSQL)); - preparedStatements.put( - selectFileAndFolderNamesSQL, con.prepareStatement(selectFileAndFolderNamesSQL)); - preparedStatements.put( - deleteFileSQL, con.prepareStatement(deleteFileSQL)); - preparedStatements.put( - deleteFolderSQL, con.prepareStatement(deleteFolderSQL)); - preparedStatements.put( - insertFileSQL, con.prepareStatement(insertFileSQL)); - preparedStatements.put( - insertFolderSQL, con.prepareStatement(insertFolderSQL)); - preparedStatements.put( - updateDataSQL, con.prepareStatement(updateDataSQL)); - preparedStatements.put( - updateLastModifiedSQL, con.prepareStatement(updateLastModifiedSQL)); - preparedStatements.put( - copyFileSQL, con.prepareStatement(copyFileSQL)); - preparedStatements.put( - copyFilesSQL, con.prepareStatement(copyFilesSQL)); - - } - - /** * Verifies that the root file system entry exists. If it doesn't exist yet * it will be automatically created. * @@ -1335,27 +1185,29 @@ */ protected void verifyRootExists() throws Exception { // check if root file system entry exists - synchronized (selectFolderExistSQL) { - ResultSet rs = null; - try { - Statement stmt = executeStmt( - selectFolderExistSQL, - new Object[]{FileSystem.SEPARATOR, ""}); - rs = stmt.getResultSet(); + Connection connection = null; + ResultSet rs = null; - if (rs.next()) { - // root entry exists - return; - } - } catch (SQLException e) { - String msg = "failed to check existence of file system root entry"; - log.error(msg, e); - throw new FileSystemException(msg, e); - } finally { - closeResultSet(rs); + try { + connection = connectionManager.getConnection(); + Statement stmt = connectionManager.executeStmt(connection, selectFolderExistSQL, + new Object[]{FileSystem.SEPARATOR, ""}); + rs = stmt.getResultSet(); + + if (rs.next()) { + // root entry exists + return; } + } catch (Exception e) { + String msg = "failed to check existence of file system root entry"; + log.error(msg, e); + throw new FileSystemException(msg, e); + } finally { + closeResultSet(rs); + connectionManager.close(connection); } + // the root entry doesn't exist yet, create it... createDeepFolder(FileSystem.SEPARATOR); } @@ -1378,19 +1230,21 @@ } } - synchronized (insertFolderSQL) { - try { - executeStmt( - insertFolderSQL, - new Object[]{ - parentDir, - name, - new Long(System.currentTimeMillis())}); - } catch (SQLException e) { - String msg = "failed to create folder entry: " + folderPath; - log.error(msg, e); - throw new FileSystemException(msg, e); - } + Connection connection = null; + + try { + connection = connectionManager.getConnection(); + connectionManager.executeStmt(connection, insertFolderSQL, + new Object[]{ + parentDir, + name, + new Long(System.currentTimeMillis())}); + } catch (Exception e) { + String msg = "failed to create folder entry: " + folderPath; + log.error(msg, e); + throw new FileSystemException(msg, e); + } finally { + connectionManager.close(connection); } } @@ -1418,14 +1272,16 @@ copyDeepFolder(src, dest); } - synchronized (copyFilesSQL) { - try { - executeStmt(copyFilesSQL, new Object[]{destPath, srcPath}); - } catch (SQLException e) { - String msg = "failed to copy file entries from " + srcPath + " to " + destPath; - log.error(msg, e); - throw new FileSystemException(msg, e); - } + Connection connection = null; + try { + connection = connectionManager.getConnection(); + connectionManager.executeStmt(connection, copyFilesSQL, new Object[]{destPath, srcPath}); + } catch (Exception e) { + String msg = "failed to copy file entries from " + srcPath + " to " + destPath; + log.error(msg, e); + throw new FileSystemException(msg, e); + } finally { + connectionManager.close(connection); } } @@ -1456,21 +1312,24 @@ } int count = 0; - synchronized (copyFileSQL) { - try { - Statement stmt = executeStmt( - copyFileSQL, - new Object[]{ - destParentDir, - destName, - srcParentDir, - srcName}); - count = stmt.getUpdateCount(); - } catch (SQLException e) { - String msg = "failed to copy file from " + srcPath + " to " + destPath; - log.error(msg, e); - throw new FileSystemException(msg, e); - } + Connection connection = null; + + try { + connection = connectionManager.getConnection(); + Statement stmt = connectionManager.executeStmt( + connection, copyFileSQL, + new Object[]{ + destParentDir, + destName, + srcParentDir, + srcName}); + count = stmt.getUpdateCount(); + } catch (Exception e) { + String msg = "failed to copy file from " + srcPath + " to " + destPath; + log.error(msg, e); + throw new FileSystemException(msg, e); + } finally { + connectionManager.close(connection); } if (count == 0) { @@ -1478,28 +1337,6 @@ } } - /** - * Resets the given PreparedStatement by clearing the parameters - * and warnings contained. - *

- * NOTE: This method MUST be called in a synchronized context as neither - * this method nor the PreparedStatement instance on which it - * operates are thread safe. - * - * @param stmt The PreparedStatement to reset. If - * null this method does nothing. - */ - protected void resetStatement(PreparedStatement stmt) { - if (stmt != null) { - try { - stmt.clearParameters(); - stmt.clearWarnings(); - } catch (SQLException se) { - log.error("failed resetting PreparedStatement", se); - } - } - } - protected void closeResultSet(ResultSet rs) { if (rs != null) { try { @@ -1510,6 +1347,10 @@ } } + /** + * closes the statement + * @param stmt the statement + */ protected void closeStatement(Statement stmt) { if (stmt != null) { try { @@ -1515,7 +1356,7 @@ try { stmt.close(); } catch (SQLException se) { - log.error("failed closing Statement", se); + log.error("Failed closing Statement", se); } } } @@ -1520,43 +1361,4 @@ } } - //--------------------------------------------------------< inner classes > - - class SizedInputStream extends FilterInputStream { - private final long size; - private boolean consumed = false; - - SizedInputStream(InputStream in, long size) { - super(in); - this.size = size; - } - - long getSize() { - return size; - } - - boolean isConsumed() { - return consumed; - } - - public int read() throws IOException { - consumed = true; - return super.read(); - } - - public long skip(long n) throws IOException { - consumed = true; - return super.skip(n); - } - - public int read(byte[] b) throws IOException { - consumed = true; - return super.read(b); - } - - public int read(byte[] b, int off, int len) throws IOException { - consumed = true; - return super.read(b, off, len); - } - } } Index: src/main/java/org/apache/jackrabbit/core/fs/db/DbFileSystem.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/fs/db/DbFileSystem.java (revision 676201) +++ src/main/java/org/apache/jackrabbit/core/fs/db/DbFileSystem.java (working copy) @@ -16,12 +16,10 @@ */ package org.apache.jackrabbit.core.fs.db; -import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionFactory; - -import java.sql.Connection; -import java.sql.SQLException; - -import javax.jcr.RepositoryException; +import org.apache.jackrabbit.core.fs.FileSystemContext; +import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionManager; +import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionProvider; +import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionProvider.ConnectionProperties; /** * DbFileSystem is a generic JDBC-based FileSystem @@ -187,13 +185,21 @@ //--------------------------------------------------< DatabaseFileSystem > - /** - * Initialize the JDBC connection. - * - * @throws SQLException if an error occurs - */ - protected Connection getConnection() throws RepositoryException, SQLException { - return ConnectionFactory.getConnection(driver, url, user, password); + protected ConnectionManager getConnectionManager(FileSystemContext context) + throws Exception { + + ConnectionProperties properties = new ConnectionProperties(); + properties.setDriver(getDriver()); + properties.setUrl(getUrl()); + properties.setUser(getUser()); + properties.setPassword(getPassword()); + + ConnectionProvider provider = context.getConnectionProvider(); + if (provider == null) { + throw new IllegalStateException("FileSystemContext must provide valid ConnectionProvider."); + } + + return new ConnectionManager(provider, properties, true); } } Index: src/main/java/org/apache/jackrabbit/core/fs/db/DerbyFileSystem.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/fs/db/DerbyFileSystem.java (revision 676201) +++ src/main/java/org/apache/jackrabbit/core/fs/db/DerbyFileSystem.java (working copy) @@ -16,13 +16,16 @@ */ package org.apache.jackrabbit.core.fs.db; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; + +import javax.jcr.RepositoryException; + +import org.apache.jackrabbit.core.fs.FileSystemException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.sql.DriverManager; -import java.sql.SQLException; -import java.sql.Connection; - /** * DerbyFileSystem is a JDBC-based FileSystem * implementation for Jackrabbit that persists file system entries in an @@ -40,7 +43,7 @@ *

  • user: the database user (default: null)
  • *
  • password: the user's password (default: null)
  • *
  • shutdownOnClose: if true (the default) the - * database is shutdown when the last connection is closed; + * database is shutdown when the file system is closed; * set this to false when using a standalone database
  • * * See also {@link DbFileSystem}. @@ -88,48 +91,56 @@ //-----------------------------------------------< DbFileSystem overrides > - /** - * Closes the given connection and shuts down the embedded Derby - * database if shutdownOnClose is set to true. - * - * @param connection database connection - * @throws SQLException if an error occurs - * @see DatabaseFileSystem#closeConnection(Connection) - */ - protected void closeConnection(Connection connection) throws SQLException { - // prepare connection url for issuing shutdown command - String url; - try { - url = connection.getMetaData().getURL(); - } catch (SQLException e) { - // JCR-1557: embedded derby db probably already shut down; - // this happens when configuring multiple FS/PM instances - // to use the same embedded derby db instance. - log.debug("failed to retrieve connection url: embedded db probably already shut down", e); + /** name of the embedded driver */ + public static final String DERBY_EMBEDDED_DRIVER = "org.apache.derby.jdbc.EmbeddedDriver"; + + private void shutDown() throws RepositoryException, SQLException { + // check for embedded driver + if (!DERBY_EMBEDDED_DRIVER.equals(getDriver())) { return; } - int pos = url.lastIndexOf(';'); - if (pos != -1) { - // strip any attributes from connection url - url = url.substring(0, pos); + + Connection connection = null; + try { + connection = connectionManager.getConnection(); + // prepare connection url for issuing shutdown command + String url = connection.getMetaData().getURL(); + int pos = url.lastIndexOf(';'); + if (pos != -1) { + // strip any attributes from connection url + url = url.substring(0, pos); + } + url += ";shutdown=true"; + + // we have to reset the connection to 'autoCommit=true' before closing it; + // otherwise Derby would mysteriously complain about some pending uncommitted + // changes which can't possibly be true. + // @todo further investigate + connection.setAutoCommit(true); + + } finally { + connectionManager.close(connection); + } + + // now it's safe to shutdown the embedded Derby database + try { + DriverManager.getConnection(url); + } catch (SQLException e) { + // a shutdown command always raises a SQLException + log.info(e.getMessage()); } - url += ";shutdown=true"; + }; - // we have to reset the connection to 'autoCommit=true' before closing it; - // otherwise Derby would mysteriously complain about some pending uncommitted - // changes which can't possibly be true. - // @todo further investigate - connection.setAutoCommit(true); - connection.close(); + public void close() throws FileSystemException { if (shutdownOnClose) { - // now it's safe to shutdown the embedded Derby database try { - DriverManager.getConnection(url); - } catch (SQLException e) { - // a shutdown command always raises a SQLException - log.info(e.getMessage()); + shutDown(); + } catch (Exception e) { + throw new FileSystemException("Error shutting down DerbyFileSystem", e); } } + + super.close(); } } Index: src/main/java/org/apache/jackrabbit/core/fs/db/JNDIDatabaseFileSystem.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/fs/db/JNDIDatabaseFileSystem.java (revision 676201) +++ src/main/java/org/apache/jackrabbit/core/fs/db/JNDIDatabaseFileSystem.java (working copy) @@ -37,6 +37,8 @@ * WARNING: The acquired database connection is kept * for the entire lifetime of the file system instance. The configured data * source should be prepared for this. + * + * THIS CLASS NO LONGER WORKS WITH CONNECTION POOLING */ public class JNDIDatabaseFileSystem extends DatabaseFileSystem { Index: src/main/java/org/apache/jackrabbit/core/fs/db/OracleFileSystem.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/fs/db/OracleFileSystem.java (revision 676201) +++ src/main/java/org/apache/jackrabbit/core/fs/db/OracleFileSystem.java (working copy) @@ -19,6 +19,7 @@ import org.apache.commons.io.IOUtils; import org.apache.jackrabbit.util.Text; import org.apache.jackrabbit.util.TransientFileFactory; +import org.apache.jackrabbit.core.fs.FileSystemContext; import org.apache.jackrabbit.core.fs.FileSystemException; import org.apache.jackrabbit.core.fs.FileSystemPathUtil; import org.apache.jackrabbit.core.fs.RandomAccessOutputStream; @@ -135,8 +136,8 @@ * @see oracle.sql.BLOB#DURATION_SESSION * @see oracle.sql.BLOB#MODE_READWRITE */ - public void init() throws FileSystemException { - super.init(); + public void init(FileSystemContext context) throws FileSystemException { + super.init(context); // initialize oracle.sql.BLOB class & constants @@ -142,8 +143,10 @@ // use the Connection object for using the exact same // class loader that the Oracle driver was loaded with + Connection connection = null; try { - blobClass = con.getClass().getClassLoader().loadClass("oracle.sql.BLOB"); + connection = connectionManager.getConnection(); + blobClass = connection.getClass().getClassLoader().loadClass("oracle.sql.BLOB"); durationSessionConstant = new Integer(blobClass.getField("DURATION_SESSION").getInt(null)); modeReadWriteConstant = @@ -152,6 +155,8 @@ String msg = "failed to load/introspect oracle.sql.BLOB"; log.error(msg, e); throw new FileSystemException(msg, e); + } finally { + connectionManager.close(connection); } } @@ -165,60 +170,67 @@ * @throws Exception if an error occurs */ protected void checkSchema() throws Exception { - DatabaseMetaData metaData = con.getMetaData(); - String tableName = schemaObjectPrefix + "FSENTRY"; - if (metaData.storesLowerCaseIdentifiers()) { - tableName = tableName.toLowerCase(); - } else if (metaData.storesUpperCaseIdentifiers()) { - tableName = tableName.toUpperCase(); - } - String userName = metaData.getUserName(); - - ResultSet rs = metaData.getTables(null, userName, tableName, null); - boolean schemaExists; + Connection connection = null; try { - schemaExists = rs.next(); - } finally { - rs.close(); - } + connection = connectionManager.getConnection(); + DatabaseMetaData metaData = connection.getMetaData(); + String tableName = schemaObjectPrefix + "FSENTRY"; + if (metaData.storesLowerCaseIdentifiers()) { + tableName = tableName.toLowerCase(); + } else if (metaData.storesUpperCaseIdentifiers()) { + tableName = tableName.toUpperCase(); + } + String userName = metaData.getUserName(); - if (!schemaExists) { - // read ddl from resources - InputStream in = OracleFileSystem.class.getResourceAsStream(schema + ".ddl"); - if (in == null) { - String msg = "Configuration error: unknown schema '" + schema + "'"; - log.debug(msg); - throw new RepositoryException(msg); - } - BufferedReader reader = new BufferedReader(new InputStreamReader(in)); - Statement stmt = con.createStatement(); + ResultSet rs = metaData.getTables(null, userName, tableName, null); + boolean schemaExists; try { - String sql = reader.readLine(); - while (sql != null) { - // Skip comments and empty lines - if (!sql.startsWith("#") && sql.length() > 0) { - // replace prefix variable - sql = Text.replace(sql, SCHEMA_OBJECT_PREFIX_VARIABLE, schemaObjectPrefix); + schemaExists = rs.next(); + } finally { + rs.close(); + } - // set the tablespace if it is defined - String tspace; - if (tableSpace == null || "".equals(tableSpace)) { - tspace = ""; - } else { - tspace = "tablespace " + tableSpace; + if (!schemaExists) { + // read ddl from resources + InputStream in = OracleFileSystem.class.getResourceAsStream(schema + ".ddl"); + if (in == null) { + String msg = "Configuration error: unknown schema '" + schema + "'"; + log.debug(msg); + throw new RepositoryException(msg); + } + BufferedReader reader = new BufferedReader(new InputStreamReader(in)); + Statement stmt = connection.createStatement(); + try { + String sql = reader.readLine(); + while (sql != null) { + // Skip comments and empty lines + if (!sql.startsWith("#") && sql.length() > 0) { + // replace prefix variable + sql = Text.replace(sql, SCHEMA_OBJECT_PREFIX_VARIABLE, schemaObjectPrefix); + + // set the tablespace if it is defined + String tspace; + if (tableSpace == null || "".equals(tableSpace)) { + tspace = ""; + } else { + tspace = "tablespace " + tableSpace; + } + sql = Text.replace(sql, TABLE_SPACE_VARIABLE, tspace).trim(); + + // execute sql stmt + stmt.executeUpdate(sql); } - sql = Text.replace(sql, TABLE_SPACE_VARIABLE, tspace).trim(); - - // execute sql stmt - stmt.executeUpdate(sql); + // read next sql stmt + sql = reader.readLine(); } - // read next sql stmt - sql = reader.readLine(); + } finally { + IOUtils.closeQuietly(in); + closeStatement(stmt); } - } finally { - IOUtils.closeQuietly(in); - closeStatement(stmt); } + + } finally { + connectionManager.close(connection); } } @@ -368,35 +380,33 @@ InputStream in = null; Blob blob = null; + Connection connection = null; try { + connection = connectionManager.getConnection(); if (isFile(filePath)) { - synchronized (updateDataSQL) { - long length = tmpFile.length(); - in = new FileInputStream(tmpFile); - blob = createTemporaryBlob(in); - executeStmt(updateDataSQL, - new Object[]{ - blob, - new Long(System.currentTimeMillis()), - new Long(length), - parentDir, - name - }); - } + long length = tmpFile.length(); + in = new FileInputStream(tmpFile); + blob = createTemporaryBlob(in); + connectionManager.executeStmt(connection, updateDataSQL, + new Object[]{ + blob, + new Long(System.currentTimeMillis()), + new Long(length), + parentDir, + name + }); } else { - synchronized (insertFileSQL) { - long length = tmpFile.length(); - in = new FileInputStream(tmpFile); - blob = createTemporaryBlob(in); - executeStmt(insertFileSQL, - new Object[]{ - parentDir, - name, - blob, - new Long(System.currentTimeMillis()), - new Long(length) - }); - } + long length = tmpFile.length(); + in = new FileInputStream(tmpFile); + blob = createTemporaryBlob(in); + connectionManager.executeStmt(connection, insertFileSQL, + new Object[]{ + parentDir, + name, + blob, + new Long(System.currentTimeMillis()), + new Long(length) + }); } } catch (Exception e) { IOException ioe = new IOException(e.getMessage()); @@ -409,6 +419,7 @@ } catch (Exception e1) { } } + connectionManager.close(connection); IOUtils.closeQuietly(in); // temp file can now safely be removed tmpFile.delete(); @@ -472,35 +483,37 @@ InputStream in = null; Blob blob = null; + Connection connection = null; try { + connection = connectionManager.getConnection(); if (isFile(filePath)) { - synchronized (updateDataSQL) { - long length = tmpFile.length(); - in = new FileInputStream(tmpFile); - blob = createTemporaryBlob(in); - executeStmt(updateDataSQL, - new Object[]{ - blob, - new Long(System.currentTimeMillis()), - new Long(length), - parentDir, - name - }); - } + + long length = tmpFile.length(); + in = new FileInputStream(tmpFile); + blob = createTemporaryBlob(in); + connectionManager.executeStmt(connection, updateDataSQL, + new Object[]{ + blob, + new Long(System.currentTimeMillis()), + new Long(length), + parentDir, + name + }); + } else { - synchronized (insertFileSQL) { - long length = tmpFile.length(); - in = new FileInputStream(tmpFile); - blob = createTemporaryBlob(in); - executeStmt(insertFileSQL, - new Object[]{ - parentDir, - name, - blob, - new Long(System.currentTimeMillis()), - new Long(length) - }); - } + + long length = tmpFile.length(); + in = new FileInputStream(tmpFile); + blob = createTemporaryBlob(in); + connectionManager.executeStmt(connection, insertFileSQL, + new Object[]{ + parentDir, + name, + blob, + new Long(System.currentTimeMillis()), + new Long(length) + }); + } } catch (Exception e) { IOException ioe = new IOException(e.getMessage()); @@ -513,6 +526,7 @@ } catch (Exception e1) { } } + connectionManager.close(connection); IOUtils.closeQuietly(in); // temp file can now safely be removed tmpFile.delete(); @@ -562,27 +576,33 @@ blob.close(); return blob; */ - Method createTemporary = blobClass.getMethod("createTemporary", - new Class[]{Connection.class, Boolean.TYPE, Integer.TYPE}); - Object blob = createTemporary.invoke(null, - new Object[]{con, Boolean.FALSE, durationSessionConstant}); - Method open = blobClass.getMethod("open", new Class[]{Integer.TYPE}); - open.invoke(blob, new Object[]{modeReadWriteConstant}); - Method getBinaryOutputStream = + Connection connection = null; + try { + connection = connectionManager.getConnection(); + Method createTemporary = blobClass.getMethod("createTemporary", + new Class[]{Connection.class, Boolean.TYPE, Integer.TYPE}); + Object blob = createTemporary.invoke(null, + new Object[]{connection, Boolean.FALSE, durationSessionConstant}); + Method open = blobClass.getMethod("open", new Class[]{Integer.TYPE}); + open.invoke(blob, new Object[]{modeReadWriteConstant}); + Method getBinaryOutputStream = blobClass.getMethod("getBinaryOutputStream", new Class[0]); - OutputStream out = (OutputStream) getBinaryOutputStream.invoke(blob, null); - try { - IOUtils.copy(in, out); - } finally { + OutputStream out = (OutputStream) getBinaryOutputStream.invoke(blob, null); try { - out.flush(); - } catch (IOException ioe) { + IOUtils.copy(in, out); + } finally { + try { + out.flush(); + } catch (IOException ioe) { + } + out.close(); } - out.close(); + Method close = blobClass.getMethod("close", new Class[0]); + close.invoke(blob, null); + return (Blob) blob; + } finally { + connectionManager.close(connection); } - Method close = blobClass.getMethod("close", new Class[0]); - close.invoke(blob, null); - return (Blob) blob; } /** Index: src/main/java/org/apache/jackrabbit/core/fs/FileSystem.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/fs/FileSystem.java (revision 676201) +++ src/main/java/org/apache/jackrabbit/core/fs/FileSystem.java (working copy) @@ -43,9 +43,11 @@ /** * Initialize the file system * + * @param context file system context + * * @throws FileSystemException if the file system initialization fails */ - void init() throws FileSystemException; + void init(FileSystemContext fileSystemContext) throws FileSystemException; /** * Close the file system. After calling this method, the file system is no Index: src/main/java/org/apache/jackrabbit/core/fs/local/LocalFileSystem.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/fs/local/LocalFileSystem.java (revision 676201) +++ src/main/java/org/apache/jackrabbit/core/fs/local/LocalFileSystem.java (working copy) @@ -17,6 +17,7 @@ package org.apache.jackrabbit.core.fs.local; import org.apache.jackrabbit.core.fs.FileSystem; +import org.apache.jackrabbit.core.fs.FileSystemContext; import org.apache.jackrabbit.core.fs.FileSystemException; import org.apache.jackrabbit.core.fs.RandomAccessOutputStream; import org.apache.jackrabbit.util.LazyFileInputStream; @@ -143,7 +144,7 @@ /** * {@inheritDoc} */ - public void init() throws FileSystemException { + public void init(FileSystemContext fileSystemContext) throws FileSystemException { if (root == null) { String msg = "root directory not set"; log.debug(msg); Index: src/main/java/org/apache/jackrabbit/core/fs/mem/MemoryFileSystem.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/fs/mem/MemoryFileSystem.java (revision 676201) +++ src/main/java/org/apache/jackrabbit/core/fs/mem/MemoryFileSystem.java (working copy) @@ -30,6 +30,7 @@ import org.apache.commons.io.IOUtils; import org.apache.jackrabbit.core.fs.FileSystem; +import org.apache.jackrabbit.core.fs.FileSystemContext; import org.apache.jackrabbit.core.fs.FileSystemException; import org.apache.jackrabbit.core.fs.RandomAccessOutputStream; @@ -166,7 +167,7 @@ return list(path).length > 0; } - public void init() { + public void init(FileSystemContext context) { createFolderInternal("/"); } Index: src/main/java/org/apache/jackrabbit/core/fs/FileSystemContext.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/fs/FileSystemContext.java (revision 0) +++ src/main/java/org/apache/jackrabbit/core/fs/FileSystemContext.java (revision 0) @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.core.fs; + +import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionProvider; + +/** + * {@link FileSystemContext} is used to pass contextual information to a {@link FileSystem}. + */ +public class FileSystemContext { + + private final ConnectionProvider connectionProvider; + + /** + * Creates new FileSystemContext + * + * @param connectionProvider + * Connection Provider. Not mandatory if the FileSystem doesn't + * use database. + */ + public FileSystemContext(ConnectionProvider connectionProvider) { + this.connectionProvider = connectionProvider; + } + + public ConnectionProvider getConnectionProvider() { + return connectionProvider; + } + +} Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/BundleDbPersistenceManager.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/persistence/bundle/BundleDbPersistenceManager.java (revision 681175) +++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/BundleDbPersistenceManager.java (working copy) @@ -46,6 +46,7 @@ import org.apache.jackrabbit.core.NodeIdIterator; import org.apache.jackrabbit.core.PropertyId; import org.apache.jackrabbit.core.fs.FileSystem; +import org.apache.jackrabbit.core.fs.FileSystemContext; import org.apache.jackrabbit.core.fs.FileSystemResource; import org.apache.jackrabbit.core.fs.local.LocalFileSystem; import org.apache.jackrabbit.core.persistence.PMContext; @@ -50,11 +51,12 @@ import org.apache.jackrabbit.core.fs.local.LocalFileSystem; import org.apache.jackrabbit.core.persistence.PMContext; import org.apache.jackrabbit.core.persistence.bundle.util.BundleBinding; -import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionRecoveryManager; +import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionManager; import org.apache.jackrabbit.core.persistence.bundle.util.DbNameIndex; import org.apache.jackrabbit.core.persistence.bundle.util.ErrorHandling; import org.apache.jackrabbit.core.persistence.bundle.util.NodePropBundle; -import org.apache.jackrabbit.core.util.StringIndex; +import org.apache.jackrabbit.core.persistence.bundle.util.StreamWrapper; +import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionProvider.ConnectionProperties; import org.apache.jackrabbit.core.persistence.util.BLOBStore; import org.apache.jackrabbit.core.persistence.util.FileSystemBLOBStore; import org.apache.jackrabbit.core.persistence.util.Serializer; @@ -63,6 +65,7 @@ import org.apache.jackrabbit.core.state.NoSuchItemStateException; import org.apache.jackrabbit.core.state.NodeReferences; import org.apache.jackrabbit.core.state.NodeReferencesId; +import org.apache.jackrabbit.core.util.StringIndex; import org.apache.jackrabbit.util.Text; import org.apache.jackrabbit.uuid.UUID; import org.slf4j.Logger; @@ -139,10 +142,7 @@ /** indicates whether to block if the database connection is lost */ protected boolean blockOnConnectionLoss = false; - /** - * The class that manages statement execution and recovery from connection loss. - */ - protected ConnectionRecoveryManager connectionManager; + protected ConnectionManager connectionManager; // SQL statements for bundle management protected String bundleInsertSQL; @@ -425,9 +425,11 @@ throw new RepositoryException(msg); } BufferedReader reader = new BufferedReader(new InputStreamReader(in)); - Statement stmt = connectionManager.getConnection().createStatement(); + Connection connection = connectionManager.getConnection(); + Statement stmt = null; String sql = null; try { + stmt = connection.createStatement(); sql = reader.readLine(); while (sql != null) { if (!sql.startsWith("#") && sql.length() > 0 @@ -451,7 +453,9 @@ throw se; } finally { IOUtils.closeQuietly(in); - stmt.close(); + connectionManager.close(connection, stmt); + connectionManager.close(connection); + } } } @@ -477,19 +481,24 @@ * @throws RepositoryException if a repository exception occurs. */ protected boolean checkTablesExist() throws SQLException, RepositoryException { - DatabaseMetaData metaData = connectionManager.getConnection().getMetaData(); - String tableName = schemaObjectPrefix + "BUNDLE"; - if (metaData.storesLowerCaseIdentifiers()) { - tableName = tableName.toLowerCase(); - } else if (metaData.storesUpperCaseIdentifiers()) { - tableName = tableName.toUpperCase(); - } - String userName = checkTablesWithUser() ? metaData.getUserName() : null; - ResultSet rs = metaData.getTables(null, userName, tableName, null); + Connection connection = connectionManager.getConnection(); try { - return rs.next(); + DatabaseMetaData metaData = connection.getMetaData(); + String tableName = schemaObjectPrefix + "BUNDLE"; + if (metaData.storesLowerCaseIdentifiers()) { + tableName = tableName.toLowerCase(); + } else if (metaData.storesUpperCaseIdentifiers()) { + tableName = tableName.toUpperCase(); + } + String userName = checkTablesWithUser() ? metaData.getUserName() : null; + ResultSet rs = metaData.getTables(null, userName, tableName, null); + try { + return rs.next(); + } finally { + rs.close(); + } } finally { - rs.close(); + connectionManager.close(connection); } } @@ -514,33 +523,34 @@ public synchronized void store(ChangeLog changeLog) throws ItemStateException { int trials = 2; Throwable lastException = null; - do { - trials--; - Connection con = null; - try { - con = connectionManager.getConnection(); - connectionManager.setAutoReconnect(false); - con.setAutoCommit(false); - super.store(changeLog); - con.commit(); - con.setAutoCommit(true); - return; - } catch (Throwable th) { - lastException = th; + Connection con = null; + try { + do { + trials--; try { - if (con != null) { - con.rollback(); + con = connectionManager.getConnection(); + con.setAutoCommit(false); + super.store(changeLog); + con.commit(); + con.setAutoCommit(true); + return; + } catch (Throwable th) { + lastException = th; + try { + if (con != null) { + con.rollback(); + } + } catch (SQLException e) { + logException("rollback failed", e); + } + if (th instanceof SQLException || th.getCause() instanceof SQLException) { + connectionManager.close(con); } - } catch (SQLException e) { - logException("rollback failed", e); - } - if (th instanceof SQLException || th.getCause() instanceof SQLException) { - connectionManager.close(); } - } finally { - connectionManager.setAutoReconnect(true); - } - } while(blockOnConnectionLoss || trials > 0); + } while(blockOnConnectionLoss || trials > 0); + } finally { + connectionManager.close(con); + } throw new ItemStateException(lastException.getMessage()); } @@ -555,8 +565,14 @@ this.name = context.getHomeDir().getName(); - connectionManager = new ConnectionRecoveryManager(blockOnConnectionLoss, - getDriver(), getUrl(), getUser(), getPassword()); + ConnectionProperties properties = new ConnectionProperties(); + properties.setUrl(getUrl()); + properties.setUser(getUser()); + properties.setPassword(getPassword()); + properties.setDriver(getDriver()); + + + connectionManager = new ConnectionManager(context.getConnectionProvider(), properties, true); // make sure schemaObjectPrefix consists of legal name characters only prepareSchemaObjectPrefix(); @@ -659,7 +675,7 @@ */ LocalFileSystem blobFS = new LocalFileSystem(); blobFS.setRoot(new File(context.getHomeDir(), "blobs")); - blobFS.init(); + blobFS.init(new FileSystemContext(context.getConnectionProvider())); return new FSBlobStore(blobFS); } @@ -763,9 +779,11 @@ // get all node bundles in the database with a single sql statement, // which is (probably) faster than loading each bundle and traversing the tree ResultSet rs = null; + Connection connection = null; try { + connection = connectionManager.getConnection(); String sql = "select count(*) from " + schemaObjectPrefix + "BUNDLE"; - Statement stmt = connectionManager.executeStmt(sql, new Object[0]); + Statement stmt = connectionManager.executeStmt(connection, sql, new Object[0]); try { rs = stmt.getResultSet(); if (!rs.next()) { @@ -781,7 +799,7 @@ } else { sql = "select NODE_ID_HI, NODE_ID_LO from " + schemaObjectPrefix + "BUNDLE"; } - stmt = connectionManager.executeStmt(sql, new Object[0]); + stmt = connectionManager.executeStmt(connection, sql, new Object[0]); rs = stmt.getResultSet(); // iterate over all node bundles in the db @@ -797,7 +815,7 @@ ResultSet bRs = null; byte[] data = null; try { - Statement bSmt = connectionManager.executeStmt(bundleSelectSQL, getKey(id.getUUID())); + Statement bSmt = connectionManager.executeStmt(connection, bundleSelectSQL, getKey(id.getUUID())); bRs = bSmt.getResultSet(); if (!bRs.next()) { throw new SQLException("bundle cannot be retrieved?"); @@ -808,7 +826,6 @@ closeResultSet(bRs); } - try { // parse and check bundle // checkBundle will log any problems itself @@ -833,6 +850,7 @@ log.error("Error loading bundle", e); } finally { closeResultSet(rs); + connectionManager.close(connection); total = count; } } else { @@ -917,25 +935,31 @@ * @throws Exception if an error occurs */ protected void prepareSchemaObjectPrefix() throws Exception { - DatabaseMetaData metaData = connectionManager.getConnection().getMetaData(); - String legalChars = metaData.getExtraNameCharacters(); - legalChars += "ABCDEFGHIJKLMNOPQRSTUVWXZY0123456789_"; + Connection connection = null; + try { + connection = connectionManager.getConnection(); + DatabaseMetaData metaData = connection.getMetaData(); + String legalChars = metaData.getExtraNameCharacters(); + legalChars += "ABCDEFGHIJKLMNOPQRSTUVWXZY0123456789_"; - String prefix = schemaObjectPrefix.toUpperCase(); - StringBuffer escaped = new StringBuffer(); - for (int i = 0; i < prefix.length(); i++) { - char c = prefix.charAt(i); - if (legalChars.indexOf(c) == -1) { - escaped.append("_x"); - String hex = Integer.toHexString(c); - escaped.append("0000".toCharArray(), 0, 4 - hex.length()); - escaped.append(hex); - escaped.append("_"); - } else { - escaped.append(c); + String prefix = schemaObjectPrefix.toUpperCase(); + StringBuffer escaped = new StringBuffer(); + for (int i = 0; i < prefix.length(); i++) { + char c = prefix.charAt(i); + if (legalChars.indexOf(c) == -1) { + escaped.append("_x"); + String hex = Integer.toHexString(c); + escaped.append("0000".toCharArray(), 0, 4 - hex.length()); + escaped.append(hex); + escaped.append("_"); + } else { + escaped.append(c); + } } + schemaObjectPrefix = escaped.toString(); + } finally { + connectionManager.close(connection); } - schemaObjectPrefix = escaped.toString(); } /** @@ -950,7 +974,6 @@ if (nameIndex instanceof DbNameIndex) { ((DbNameIndex) nameIndex).close(); } - connectionManager.close(); // close blob store blobStore.close(); blobStore = null; @@ -1035,6 +1058,7 @@ public synchronized NodeIdIterator getAllNodeIds(NodeId bigger, int maxCount) throws ItemStateException, RepositoryException { ResultSet rs = null; + Connection connection = null; try { UUID lowUuid; Object[] keys; @@ -1055,7 +1079,8 @@ // see also bundleSelectAllIdsFrom SQL statement maxCount += 10; } - Statement stmt = connectionManager.executeStmt(sql, keys, false, maxCount); + connection = connectionManager.getConnection(); + Statement stmt = connectionManager.executeStmt(connection, sql, keys, false, maxCount); rs = stmt.getResultSet(); ArrayList result = new ArrayList(); while ((maxCount == 0 || result.size() < maxCount) && rs.next()) { @@ -1082,6 +1107,7 @@ throw new ItemStateException(msg, e); } finally { closeResultSet(rs); + connectionManager.close(connection); } } @@ -1131,8 +1157,10 @@ protected synchronized NodePropBundle loadBundle(NodeId id, boolean checkBeforeLoading) throws ItemStateException { ResultSet rs = null; + Connection connection = null; try { - Statement stmt = connectionManager.executeStmt(bundleSelectSQL, getKey(id.getUUID())); + connection = connectionManager.getConnection(); + Statement stmt = connectionManager.executeStmt(connection, bundleSelectSQL, getKey(id.getUUID())); rs = stmt.getResultSet(); if (!rs.next()) { return null; @@ -1160,6 +1188,7 @@ throw new ItemStateException(msg, e); } finally { closeResultSet(rs); + connectionManager.close(connection); } } @@ -1168,8 +1197,10 @@ */ protected synchronized boolean existsBundle(NodeId id) throws ItemStateException { ResultSet rs = null; + Connection connection = null; try { - Statement stmt = connectionManager.executeStmt(bundleSelectSQL, getKey(id.getUUID())); + connection = connectionManager.getConnection(); + Statement stmt = connectionManager.executeStmt(connection, bundleSelectSQL, getKey(id.getUUID())); rs = stmt.getResultSet(); // a bundle exists, if the result has at least one entry return rs.next(); @@ -1179,6 +1210,7 @@ throw new ItemStateException(msg, e); } finally { closeResultSet(rs); + connectionManager.close(connection); } } @@ -1186,6 +1218,7 @@ * {@inheritDoc} */ protected synchronized void storeBundle(NodePropBundle bundle) throws ItemStateException { + Connection connection = null; try { ByteArrayOutputStream out = new ByteArrayOutputStream(INITIAL_BUFFER_SIZE); DataOutputStream dout = new DataOutputStream(out); @@ -1194,7 +1227,8 @@ String sql = bundle.isNew() ? bundleInsertSQL : bundleUpdateSQL; Object[] params = createParams(bundle.getId().getUUID(), out.toByteArray(), true); - connectionManager.executeStmt(sql, params); + connection = connectionManager.getConnection(); + connectionManager.executeStmt(connection, sql, params); } catch (Exception e) { String msg = "failed to write bundle: " + bundle.getId(); log.error(msg, e); @@ -1199,6 +1233,8 @@ String msg = "failed to write bundle: " + bundle.getId(); log.error(msg, e); throw new ItemStateException(msg, e); + } finally { + connectionManager.close(connection); } } @@ -1206,8 +1242,10 @@ * {@inheritDoc} */ protected synchronized void destroyBundle(NodePropBundle bundle) throws ItemStateException { + Connection connection = null; try { - connectionManager.executeStmt(bundleDeleteSQL, getKey(bundle.getId().getUUID())); + connection = connectionManager.getConnection(); + connectionManager.executeStmt(connection, bundleDeleteSQL, getKey(bundle.getId().getUUID())); } catch (Exception e) { if (e instanceof NoSuchItemStateException) { throw (NoSuchItemStateException) e; @@ -1215,6 +1253,8 @@ String msg = "failed to delete bundle: " + bundle.getId(); log.error(msg, e); throw new ItemStateException(msg, e); + } finally { + connectionManager.close(connection); } } @@ -1229,8 +1269,10 @@ ResultSet rs = null; InputStream in = null; + Connection connection = null; try { - Statement stmt = connectionManager.executeStmt( + connection = connectionManager.getConnection(); + Statement stmt = connectionManager.executeStmt(connection, nodeReferenceSelectSQL, getKey(targetId.getTargetId().getUUID())); rs = stmt.getResultSet(); if (!rs.next()) { @@ -1252,6 +1294,7 @@ } finally { IOUtils.closeQuietly(in); closeResultSet(rs); + connectionManager.close(connection); } } @@ -1272,7 +1315,7 @@ // check if insert or update boolean update = exists(refs.getId()); String sql = (update) ? nodeReferenceUpdateSQL : nodeReferenceInsertSQL; - + Connection connection = null; try { ByteArrayOutputStream out = new ByteArrayOutputStream(INITIAL_BUFFER_SIZE); @@ -1280,7 +1323,8 @@ Serializer.serialize(refs, out); Object[] params = createParams(refs.getTargetId().getUUID(), out.toByteArray(), true); - connectionManager.executeStmt(sql, params); + connection = connectionManager.getConnection(); + connectionManager.executeStmt(connection, sql, params); // there's no need to close a ByteArrayOutputStream //out.close(); @@ -1288,6 +1332,8 @@ String msg = "failed to write node references: " + refs.getId(); log.error(msg, e); throw new ItemStateException(msg, e); + } finally { + connectionManager.close(connection); } } @@ -1298,9 +1344,10 @@ if (!initialized) { throw new IllegalStateException("not initialized"); } - + Connection connection = null; try { - connectionManager.executeStmt(nodeReferenceDeleteSQL, + connection = connectionManager.getConnection(); + connectionManager.executeStmt(connection, nodeReferenceDeleteSQL, getKey(refs.getTargetId().getUUID())); } catch (Exception e) { if (e instanceof NoSuchItemStateException) { @@ -1309,6 +1356,8 @@ String msg = "failed to delete references: " + refs.getTargetId(); log.error(msg, e); throw new ItemStateException(msg, e); + } finally { + connectionManager.close(connection); } } @@ -1321,8 +1370,10 @@ } ResultSet rs = null; + Connection connection = null; try { - Statement stmt = connectionManager.executeStmt(nodeReferenceSelectSQL, + connection = connectionManager.getConnection(); + Statement stmt = connectionManager.executeStmt(connection, nodeReferenceSelectSQL, getKey(targetId.getTargetId().getUUID())); rs = stmt.getResultSet(); @@ -1335,6 +1386,7 @@ throw new ItemStateException(msg, e); } finally { closeResultSet(rs); + connectionManager.close(connection); } } @@ -1521,31 +1573,51 @@ * {@inheritDoc} */ public InputStream get(String blobId) throws Exception { - Statement stmt = connectionManager.executeStmt(blobSelectSQL, new Object[]{blobId}); - final ResultSet rs = stmt.getResultSet(); - if (!rs.next()) { - closeResultSet(rs); - throw new Exception("no such BLOB: " + blobId); - } - InputStream in = rs.getBinaryStream(1); - if (in == null) { - // some databases treat zero-length values as NULL; - // return empty InputStream in such a case - closeResultSet(rs); - return new ByteArrayInputStream(new byte[0]); - } - - /** - * return an InputStream wrapper in order to - * close the ResultSet when the stream is closed - */ - return new FilterInputStream(in) { - public void close() throws IOException { - in.close(); - // now it's safe to close ResultSet + Connection connection = null; + try { + connection = connectionManager.getConnection(); + Statement stmt = connectionManager.executeStmt(connection, blobSelectSQL, new Object[]{blobId}); + final ResultSet rs = stmt.getResultSet(); + if (!rs.next()) { + closeResultSet(rs); + throw new Exception("no such BLOB: " + blobId); + } + InputStream in = rs.getBinaryStream(1); + if (in == null) { + // some databases treat zero-length values as NULL; + // return empty InputStream in such a case closeResultSet(rs); + return new ByteArrayInputStream(new byte[0]); } - }; + + final Connection c = connection; + + // prevent closing the connection in the finally block + connection = null; + + /** + * return an InputStream wrapper in order to + * close the ResultSet when the stream is closed + */ + return new FilterInputStream(in) { + private boolean closed = false; + public void close() throws IOException { + in.close(); + // now it's safe to close ResultSet + closeResultSet(rs); + connectionManager.close(c); + closed = true; + } + protected void finalize() throws Throwable { + if (!closed) { + close(); + } + super.finalize(); + } + }; + } finally { + connectionManager.close(connection); + } } /** @@ -1553,15 +1625,22 @@ */ public synchronized void put(String blobId, InputStream in, long size) throws Exception { - Statement stmt = connectionManager.executeStmt(blobSelectExistSQL, new Object[]{blobId}); - ResultSet rs = stmt.getResultSet(); - // a BLOB exists if the result has at least one entry - boolean exists = rs.next(); - closeResultSet(rs); + Connection connection = null; + try { + connection = connectionManager.getConnection(); + + Statement stmt = connectionManager.executeStmt(connection, blobSelectExistSQL, new Object[]{blobId}); + ResultSet rs = stmt.getResultSet(); + // a BLOB exists if the result has at least one entry + boolean exists = rs.next(); + closeResultSet(rs); - String sql = (exists) ? blobUpdateSQL : blobInsertSQL; - Object[] params = new Object[]{new ConnectionRecoveryManager.StreamWrapper(in, size), blobId}; - connectionManager.executeStmt(sql, params); + String sql = (exists) ? blobUpdateSQL : blobInsertSQL; + Object[] params = new Object[]{new StreamWrapper(in, size), blobId}; + connectionManager.executeStmt(connection, sql, params); + } finally { + connectionManager.close(connection); + } } /** @@ -1568,8 +1647,14 @@ * {@inheritDoc} */ public synchronized boolean remove(String blobId) throws Exception { - Statement stmt = connectionManager.executeStmt(blobDeleteSQL, new Object[]{blobId}); - return stmt.getUpdateCount() == 1; + Connection connection = null; + try { + connection = connectionManager.getConnection(); + Statement stmt = connectionManager.executeStmt(connection, blobDeleteSQL, new Object[]{blobId}); + return stmt.getUpdateCount() == 1; + } finally { + connectionManager.close(connection); + } } public void close() { Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/BundleFsPersistenceManager.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/persistence/bundle/BundleFsPersistenceManager.java (revision 676201) +++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/BundleFsPersistenceManager.java (working copy) @@ -21,6 +21,7 @@ import org.apache.commons.io.IOUtils; import org.apache.jackrabbit.core.fs.FileSystem; import org.apache.jackrabbit.core.fs.BasedFileSystem; +import org.apache.jackrabbit.core.fs.FileSystemContext; import org.apache.jackrabbit.core.fs.FileSystemException; import org.apache.jackrabbit.core.fs.local.LocalFileSystem; import org.apache.jackrabbit.core.persistence.PMContext; @@ -193,7 +194,7 @@ if (useLocalFsBlobStore()) { LocalFileSystem blobFS = new LocalFileSystem(); blobFS.setRoot(new File(context.getHomeDir(), "blobs")); - blobFS.init(); + blobFS.init(new FileSystemContext(context.getConnectionProvider())); blobStore = new BundleFsPersistenceManager.FSBlobStore(blobFS); } else { blobStore = new BundleFsPersistenceManager.FSBlobStore(itemFs); Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/DerbyPersistenceManager.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/persistence/bundle/DerbyPersistenceManager.java (revision 676201) +++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/DerbyPersistenceManager.java (working copy) @@ -20,6 +20,7 @@ import org.slf4j.LoggerFactory; import org.apache.jackrabbit.core.persistence.PMContext; +import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Statement; @@ -260,8 +261,11 @@ protected void checkSchema() throws SQLException, RepositoryException { // set properties if (DERBY_EMBEDDED_DRIVER.equals(getDriver())) { - Statement stmt = connectionManager.getConnection().createStatement(); + Connection connection = null; + Statement stmt = null; try { + connection = connectionManager.getConnection(); + stmt = connection.createStatement(); stmt.execute("CALL SYSCS_UTIL.SYSCS_SET_DATABASE_PROPERTY " + "('derby.storage.initialPages', '" + derbyStorageInitialPages + "')"); stmt.execute("CALL SYSCS_UTIL.SYSCS_SET_DATABASE_PROPERTY " @@ -274,7 +278,8 @@ + "('derby.storage.pageSize', '" + derbyStoragePageSize + "')"); } finally { - stmt.close(); + connectionManager.close(connection, stmt); + connectionManager.close(connection); } } super.checkSchema(); @@ -305,20 +310,26 @@ return; } - // prepare connection url for issuing shutdown command - String url = connectionManager.getConnection().getMetaData().getURL(); - int pos = url.lastIndexOf(';'); - if (pos != -1) { - // strip any attributes from connection url - url = url.substring(0, pos); - } - url += ";shutdown=true"; + Connection connection = null; + try { + connection = connectionManager.getConnection(); + // prepare connection url for issuing shutdown command + String url = connection.getMetaData().getURL(); + int pos = url.lastIndexOf(';'); + if (pos != -1) { + // strip any attributes from connection url + url = url.substring(0, pos); + } + url += ";shutdown=true"; - // we have to reset the connection to 'autoCommit=true' before closing it; - // otherwise Derby would mysteriously complain about some pending uncommitted - // changes which can't possibly be true. - // @todo further investigate - connectionManager.getConnection().setAutoCommit(true); + // we have to reset the connection to 'autoCommit=true' before closing it; + // otherwise Derby would mysteriously complain about some pending uncommitted + // changes which can't possibly be true. + // @todo further investigate + connection.setAutoCommit(true); + } finally { + connectionManager.close(connection); + } super.close(); Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/H2PersistenceManager.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/persistence/bundle/H2PersistenceManager.java (revision 676201) +++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/H2PersistenceManager.java (working copy) @@ -18,6 +18,7 @@ import org.apache.jackrabbit.core.persistence.PMContext; +import java.sql.Connection; import java.sql.Statement; import java.sql.SQLException; @@ -95,11 +96,15 @@ * {@inheritDoc} */ protected void checkSchema() throws SQLException, RepositoryException { - Statement stmt = connectionManager.getConnection().createStatement(); + Connection connection = null; + Statement stmt = null; try { + connection = connectionManager.getConnection(); + stmt = connection.createStatement(); stmt.execute("SET LOCK_TIMEOUT " + lockTimeout); } finally { - stmt.close(); + connectionManager.close(connection, stmt); + connectionManager.close(connection); } super.checkSchema(); } Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/Oracle9PersistenceManager.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/persistence/bundle/Oracle9PersistenceManager.java (revision 676201) +++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/Oracle9PersistenceManager.java (working copy) @@ -88,11 +88,17 @@ // use the Connection object for using the exact same // class loader that the Oracle driver was loaded with - blobClass = connectionManager.getConnection().getClass().getClassLoader().loadClass("oracle.sql.BLOB"); - duractionSessionConstant = - new Integer(blobClass.getField("DURATION_SESSION").getInt(null)); - modeReadWriteConstant = - new Integer(blobClass.getField("MODE_READWRITE").getInt(null)); + Connection connection = null; + try { + connection = connectionManager.getConnection(); + blobClass = connection.getClass().getClassLoader().loadClass("oracle.sql.BLOB"); + duractionSessionConstant = + new Integer(blobClass.getField("DURATION_SESSION").getInt(null)); + modeReadWriteConstant = + new Integer(blobClass.getField("MODE_READWRITE").getInt(null)); + } finally { + connectionManager.close(connection); + } } /** @@ -108,6 +114,7 @@ protected synchronized void storeBundle(NodePropBundle bundle) throws ItemStateException { Blob blob = null; + Connection connection = null; try { ByteArrayOutputStream out = new ByteArrayOutputStream(INITIAL_BUFFER_SIZE); DataOutputStream dout = new DataOutputStream(out); @@ -117,7 +124,8 @@ String sql = bundle.isNew() ? bundleInsertSQL : bundleUpdateSQL; blob = createTemporaryBlob(new ByteArrayInputStream(out.toByteArray())); Object[] params = createParams(bundle.getId().getUUID(), blob, true); - connectionManager.executeStmt(sql, params); + connection = connectionManager.getConnection(); + connectionManager.executeStmt(connection, sql, params); } catch (Exception e) { String msg = "failed to write bundle: " + bundle.getId(); log.error(msg, e); @@ -129,6 +137,7 @@ } catch (Exception e1) { } } + connectionManager.close(connection); } } @@ -142,6 +151,7 @@ } Blob blob = null; + Connection connection = null; try { // check if insert or update boolean update = exists(refs.getId()); @@ -156,7 +166,8 @@ blob = createTemporaryBlob(new ByteArrayInputStream(out.toByteArray())); Object[] params = createParams(refs.getTargetId().getUUID(), blob, true); - connectionManager.executeStmt(sql, params); + connection = connectionManager.getConnection(); + connectionManager.executeStmt(connection, sql, params); // there's no need to close a ByteArrayOutputStream //out.close(); @@ -171,6 +182,7 @@ } catch (Exception e1) { } } + connectionManager.close(connection); } } @@ -190,26 +202,33 @@ blob.close(); return blob; */ - Method createTemporary = blobClass.getMethod("createTemporary", - new Class[]{Connection.class, Boolean.TYPE, Integer.TYPE}); - Object blob = createTemporary.invoke(null, - new Object[]{connectionManager.getConnection(), Boolean.FALSE, duractionSessionConstant}); - Method open = blobClass.getMethod("open", new Class[]{Integer.TYPE}); - open.invoke(blob, new Object[]{modeReadWriteConstant}); - Method getBinaryOutputStream = blobClass.getMethod("getBinaryOutputStream", new Class[0]); - OutputStream out = (OutputStream) getBinaryOutputStream.invoke(blob, null); + + Connection connection = null; try { - IOUtils.copy(in, out); - } finally { + connection = connectionManager.getConnection(); + Method createTemporary = blobClass.getMethod("createTemporary", + new Class[]{Connection.class, Boolean.TYPE, Integer.TYPE}); + Object blob = createTemporary.invoke(null, + new Object[]{connection, Boolean.FALSE, duractionSessionConstant}); + Method open = blobClass.getMethod("open", new Class[]{Integer.TYPE}); + open.invoke(blob, new Object[]{modeReadWriteConstant}); + Method getBinaryOutputStream = blobClass.getMethod("getBinaryOutputStream", new Class[0]); + OutputStream out = (OutputStream) getBinaryOutputStream.invoke(blob, null); try { - out.flush(); - } catch (IOException ioe) { + IOUtils.copy(in, out); + } finally { + try { + out.flush(); + } catch (IOException ioe) { + } + out.close(); } - out.close(); + Method close = blobClass.getMethod("close", new Class[0]); + close.invoke(blob, null); + return (Blob) blob; + } finally { + connectionManager.close(connection); } - Method close = blobClass.getMethod("close", new Class[0]); - close.invoke(blob, null); - return (Blob) blob; } /** @@ -234,8 +253,11 @@ throws Exception { Blob blob = null; + Connection connection = null; + Statement stmt = null; try { - Statement stmt = connectionManager.executeStmt(blobSelectExistSQL, new Object[]{blobId}); + connection = connectionManager.getConnection(); + stmt = connectionManager.executeStmt(connection, blobSelectExistSQL, new Object[]{blobId}); ResultSet rs = stmt.getResultSet(); // a BLOB exists if the result has at least one entry boolean exists = rs.next(); @@ -243,7 +265,7 @@ String sql = (exists) ? blobUpdateSQL : blobInsertSQL; blob = createTemporaryBlob(in); - connectionManager.executeStmt(sql, new Object[]{blob, blobId}); + connectionManager.executeStmt(connection, sql, new Object[]{blob, blobId}); } finally { if (blob != null) { try { @@ -251,6 +273,8 @@ } catch (Exception e) { } } + connectionManager.close(connection, stmt); + connectionManager.close(connection); } } } Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/OraclePersistenceManager.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/persistence/bundle/OraclePersistenceManager.java (revision 676201) +++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/OraclePersistenceManager.java (working copy) @@ -16,6 +16,7 @@ */ package org.apache.jackrabbit.core.persistence.bundle; +import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.SQLException; @@ -104,8 +105,10 @@ super.init(context); // check driver version + Connection connection = null; try { - DatabaseMetaData metaData = connectionManager.getConnection().getMetaData(); + connection = connectionManager.getConnection(); + DatabaseMetaData metaData = connection.getMetaData(); if (metaData.getDriverMajorVersion() < 10) { // Oracle drivers prior to version 10 only support // writing BLOBs up to 32k in size... @@ -115,6 +118,8 @@ } } catch (SQLException e) { log.warn("Can not retrieve driver version", e); + } finally { + connectionManager.close(connection); } } @@ -159,20 +164,26 @@ * @inheritDoc */ protected void prepareSchemaObjectPrefix() throws Exception { - DatabaseMetaData metaData = connectionManager.getConnection().getMetaData(); - String legalChars = metaData.getExtraNameCharacters(); - legalChars += "ABCDEFGHIJKLMNOPQRSTUVWXZY0123456789_"; + Connection connection = null; + try { + connection = connectionManager.getConnection(); + DatabaseMetaData metaData = connection.getMetaData(); + String legalChars = metaData.getExtraNameCharacters(); + legalChars += "ABCDEFGHIJKLMNOPQRSTUVWXZY0123456789_"; - String prefix = schemaObjectPrefix.toUpperCase(); - StringBuffer escaped = new StringBuffer(); - for (int i = 0; i < prefix.length(); i++) { - char c = prefix.charAt(i); - if (legalChars.indexOf(c) == -1) { - escaped.append('_'); - } else { - escaped.append(c); + String prefix = schemaObjectPrefix.toUpperCase(); + StringBuffer escaped = new StringBuffer(); + for (int i = 0; i < prefix.length(); i++) { + char c = prefix.charAt(i); + if (legalChars.indexOf(c) == -1) { + escaped.append('_'); + } else { + escaped.append(c); + } } + schemaObjectPrefix = escaped.toString(); + } finally { + connectionManager.close(connection); } - schemaObjectPrefix = escaped.toString(); } } Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/PostgreSQLPersistenceManager.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/persistence/bundle/PostgreSQLPersistenceManager.java (revision 676201) +++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/PostgreSQLPersistenceManager.java (working copy) @@ -28,6 +28,7 @@ import java.io.DataInputStream; import java.io.InputStream; +import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; @@ -90,8 +91,11 @@ protected synchronized NodePropBundle loadBundle(NodeId id) throws ItemStateException { + Connection connection = null; + Statement stmt = null; try { - Statement stmt = connectionManager.executeStmt(bundleSelectSQL, getKey(id.getUUID())); + connection = connectionManager.getConnection(); + stmt = connectionManager.executeStmt(connection, bundleSelectSQL, getKey(id.getUUID())); ResultSet rs = stmt.getResultSet(); try { if (rs.next()) { @@ -115,6 +119,9 @@ String msg = "failed to read bundle: " + id + ": " + e; log.error(msg); throw new ItemStateException(msg, e); + } finally { + connectionManager.close(connection, stmt); + connectionManager.close(connection); } } Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/ConnectionRecoveryManager.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/ConnectionRecoveryManager.java (revision 681176) +++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/ConnectionRecoveryManager.java (working copy) @@ -462,8 +462,8 @@ */ public static class StreamWrapper { - private final InputStream stream; - private final long size; + final InputStream stream; + final long size; /** * Creates a wrapper for the given InputStream that can Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/DbNameIndex.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/DbNameIndex.java (revision 681176) +++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/DbNameIndex.java (working copy) @@ -16,11 +16,11 @@ */ package org.apache.jackrabbit.core.persistence.bundle.util; -import java.util.HashMap; - +import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.util.HashMap; import org.apache.jackrabbit.core.util.StringIndex; @@ -40,7 +40,7 @@ /** * The class that manages statement execution and recovery from connection loss. */ - protected ConnectionRecoveryManager connectionManager; + protected ConnectionManager connectionManager; // name index statements protected String nameSelectSQL; @@ -57,7 +57,7 @@ * @param schemaObjectPrefix the prefix for table names * @throws SQLException if the statements cannot be prepared. */ - public DbNameIndex(ConnectionRecoveryManager conMgr, String schemaObjectPrefix) + public DbNameIndex(ConnectionManager conMgr, String schemaObjectPrefix) throws SQLException { connectionManager = conMgr; init(schemaObjectPrefix); @@ -132,29 +132,24 @@ */ protected int insertString(String string) { // assert index does not exist - int result = -1; + ResultSet rs = null; + Connection connection = null; try { - Statement stmt = connectionManager.executeStmt( - nameInsertSQL, new Object[] { string }, true, 0); - ResultSet rs = stmt.getGeneratedKeys(); - try { - if (rs.next()) { - result = rs.getInt(1); - } - } finally { - rs.close(); + connection = connectionManager.getConnection(); + Statement stmt = connectionManager.executeStmt(connection, nameInsertSQL, new Object[]{string}, true, 0); + rs = stmt.getGeneratedKeys(); + if (!rs.next()) { + return -1; + } else { + return rs.getInt(1); } } catch (Exception e) { - IllegalStateException ise = new IllegalStateException( - "Unable to insert index for string: " + string); + IllegalStateException ise = new IllegalStateException("Unable to insert index for string: " + string); ise.initCause(e); throw ise; - } - if (result != -1) { - return result; - } else { - // Could not get the index with getGeneratedKeys, try with SELECT - return getIndex(string); + } finally { + connectionManager.close(connection); + closeResultSet(rs); } } @@ -164,24 +159,24 @@ * @return the index or -1 if not found. */ protected int getIndex(String string) { + ResultSet rs = null; + Connection connection = null; try { - Statement stmt = connectionManager.executeStmt( - indexSelectSQL, new Object[] { string }); - ResultSet rs = stmt.getResultSet(); - try { - if (rs.next()) { - return rs.getInt(1); - } else { - return -1; - } - } finally { - rs.close(); + connection = connectionManager.getConnection(); + Statement stmt = connectionManager.executeStmt(connection, indexSelectSQL, new Object[]{string}); + rs = stmt.getResultSet(); + if (!rs.next()) { + return -1; + } else { + return rs.getInt(1); } } catch (Exception e) { - IllegalStateException ise = new IllegalStateException( - "Unable to read index for string: " + string); + IllegalStateException ise = new IllegalStateException("Unable to read index for string: " + string); ise.initCause(e); throw ise; + } finally { + connectionManager.close(connection); + closeResultSet(rs); } } @@ -188,33 +183,41 @@ /** * Retrieves the string from the database for the given index. * @param index the index to retrieve the string for. - * @return the string - * @throws IllegalArgumentException if the string is not found + * @return the string or null if not found. */ - protected String getString(int index) - throws IllegalArgumentException, IllegalStateException { - String result = null; + protected String getString(int index) { + ResultSet rs = null; + Connection connection = null; try { - Statement stmt = connectionManager.executeStmt( - nameSelectSQL, new Object[] { new Integer(index) }); - ResultSet rs = stmt.getResultSet(); - try { - if (rs.next()) { - result = rs.getString(1); - } - } finally { - rs.close(); + connection = connectionManager.getConnection(); + Statement stmt = connectionManager.executeStmt(connection, nameSelectSQL, new Object[]{new Integer(index)}); + rs = stmt.getResultSet(); + if (!rs.next()) { + return null; + } else { + return rs.getString(1); } } catch (Exception e) { - IllegalStateException ise = new IllegalStateException( - "Unable to read name for index: " + index); + IllegalStateException ise = new IllegalStateException("Unable to read name for index: " + index); ise.initCause(e); throw ise; + } finally { + connectionManager.close(connection); + closeResultSet(rs); } - if (result == null) { - throw new IllegalArgumentException("Index not found: " + index); + } + + /** + * Closes the result set + * @param rs the result set. + */ + protected void closeResultSet(ResultSet rs) { + if (rs != null) { + try { + rs.close(); + } catch (SQLException se) { + // ignore + } } - return result; } - } Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/NGKDbNameIndex.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/NGKDbNameIndex.java (revision 681177) +++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/NGKDbNameIndex.java (working copy) @@ -16,6 +16,8 @@ */ package org.apache.jackrabbit.core.persistence.bundle.util; +import java.sql.Connection; +import java.sql.ResultSet; import java.sql.SQLException; /** @@ -31,7 +33,7 @@ * @param schemaObjectPrefix the prefix for table names * @throws SQLException if the statements cannot be prepared. */ - public NGKDbNameIndex(ConnectionRecoveryManager conMgr, String schemaObjectPrefix) + public NGKDbNameIndex(ConnectionManager conMgr, String schemaObjectPrefix) throws SQLException { super(conMgr, schemaObjectPrefix); } @@ -57,15 +59,19 @@ */ protected int insertString(String string) { // assert index does not exist + ResultSet rs = null; + Connection connection = null; try { - connectionManager.executeStmt(nameInsertSQL, new Object[] { string }); + connection = connectionManager.getConnection(); + connectionManager.executeStmt(connection, nameInsertSQL, new Object[]{string}); + return getIndex(string); } catch (Exception e) { - IllegalStateException ise = new IllegalStateException( - "Unable to insert index for string: " + string); + IllegalStateException ise = new IllegalStateException("Unable to insert index for string: " + string); ise.initCause(e); throw ise; + } finally { + connectionManager.close(connection); + closeResultSet(rs); } - - return getIndex(string); } } Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/NodePropBundle.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/NodePropBundle.java (revision 683292) +++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/NodePropBundle.java (working copy) @@ -109,7 +109,7 @@ * the size */ private long size = 0; - + /** * Shared set, consisting of the parent ids of this shareable node. This * entry is null if this node is not shareable. @@ -433,7 +433,7 @@ pe.destroy(binding.getBlobStore()); } } - + /** * Sets the shared set of this bundle. * @return the shared set of this bundle. Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/PostgreSQLNameIndex.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/PostgreSQLNameIndex.java (revision 681176) +++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/PostgreSQLNameIndex.java (working copy) @@ -16,6 +16,7 @@ */ package org.apache.jackrabbit.core.persistence.bundle.util; +import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; @@ -28,7 +29,7 @@ protected String generatedKeySelectSQL; - public PostgreSQLNameIndex(ConnectionRecoveryManager conMgr, String schemaObjectPrefix) + public PostgreSQLNameIndex(ConnectionManager conMgr, String schemaObjectPrefix) throws SQLException { super(conMgr, schemaObjectPrefix); } @@ -59,13 +60,17 @@ */ protected int insertString(String string) { // assert index does not exist + Connection connection = null; try { - connectionManager.executeStmt(nameInsertSQL, new Object[]{string}); - return getGeneratedKey(); + connection = connectionManager.getConnection(); + connectionManager.executeStmt(connection, nameInsertSQL, new Object[]{string}); + return getGeneratedKey(connection); } catch (Exception e) { IllegalStateException ise = new IllegalStateException("Unable to insert index for string: " + string); ise.initCause(e); - throw ise ; + throw ise; + } finally { + connectionManager.close(connection); } } @@ -73,9 +78,9 @@ * Retrieves the last assigned key from the database. * @return the index. */ - protected int getGeneratedKey() { + protected int getGeneratedKey(Connection connection) { try { - ResultSet rs = connectionManager.executeQuery(generatedKeySelectSQL); + ResultSet rs = connectionManager.executeQuery(connection, generatedKeySelectSQL); try { if (!rs.next()) { return -1; Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/ConnectionManager.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/ConnectionManager.java (revision 0) +++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/ConnectionManager.java (revision 0) @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.core.persistence.bundle.util; + +import java.io.InputStream; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Map; + +import javax.jcr.RepositoryException; + +import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionProvider.ConnectionProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap; + +/** + * This class provides methods to get a database connection and to execute SQL statements. + */ +public class ConnectionManager { + + private final ConnectionProvider connectionProvider; + private final ConnectionProperties connectionProperties; + + /** + * Creates a new {@link ConnectionManager} instance + * + * @param connectionProvider + * @param connectionProperties + */ + public ConnectionManager(ConnectionProvider connectionProvider, ConnectionProperties connectionProperties, + boolean threadBound) { + if (!threadBound) { + this.connectionProvider = connectionProvider; + } else { + this.connectionProvider = new ThreadLocalConnectionProviderAdapter(connectionProvider); + } + this.connectionProperties = connectionProperties; + } + + /** + * Returns a database {@link Connection} + * @return + * @throws SQLException + * @throws RepositoryException + */ + public Connection getConnection() throws SQLException, RepositoryException { + Connection connection = connectionProvider.getConnection(connectionProperties); + + ConnectionEntry e = (ConnectionEntry) connectionEntries.get(connection); + if (e != null) { + ++e.usageCount; + } else { + e = new ConnectionEntry(); + connectionEntries.put(connection, e); + } + + return connection; + }; + + /** + * Closes the database {@link Connection}. This method must always be called to close a connection + * (instead of calling {@link Connection#close()}). + * @param connection + */ + public void close(Connection connection) { + if (connection != null) { + ConnectionEntry entry = (ConnectionEntry) connectionEntries.get(connection); + try { + connectionProvider.closeConnection(connection); + } catch (SQLException e) { + log.error("Error closing SQL connection", e); + } + if (entry != null) { + --entry.usageCount; + if (entry.usageCount <= 0) { + connectionEntries.remove(entry); + } + } + } + } + + public void close(Connection connection, Statement statement) { + if (statement != null) { + try { + if (connection != null) { + ConnectionEntry entry = (ConnectionEntry) connectionEntries.get(connection); + if (entry != null) { + entry.statements.remove(statement); + } + } + statement.close(); + } catch (SQLException e) { + log.error("Error closing SQL statement", e); + } + } + } + + /** + * Entry for each active connection. + */ + private static class ConnectionEntry { + private ConnectionEntry() { + } + + private Map/**/ statements = new ConcurrentHashMap(); + private int usageCount = 1; + }; + + private Map/**/ connectionEntries = new ConcurrentHashMap(); + + public int getUsedConnectionsCount() { + return connectionEntries.size(); + } + + /** + * Executes the given SQL query. Retries once or blocks (when the + * block parameter has been set to true on construction) + * if this fails and autoReconnect is enabled. + * + * @param sql the SQL query to execute + * @return the executed ResultSet + * @throws SQLException on error + * @throws RepositoryException if the database driver could not be loaded + */ + public synchronized ResultSet executeQuery(Connection connection, String sql) throws SQLException, RepositoryException { + return executeQueryInternal(connection, sql); + } + + /** + * Executes the given SQL query. + * + * @param sql query to execute + * @return a ResultSet object + * @throws SQLException if an error occurs + * @throws RepositoryException if the database driver could not be loaded + */ + private ResultSet executeQueryInternal(Connection connection, String sql) throws SQLException, RepositoryException { + PreparedStatement stmt = null; + try { + ConnectionEntry entry = (ConnectionEntry) connectionEntries.get(connection); + + stmt = (PreparedStatement) entry.statements.get(sql); + if (stmt == null) { + stmt = connection.prepareStatement(sql); + entry.statements.put(sql, stmt); + } + return stmt.executeQuery(); + } catch (SQLException e) { + logException("could not execute statement", e); + throw e; + } finally { + resetStatement(stmt); + } + } + + /** + * Resets the given PreparedStatement by clearing the + * parameters and warnings contained. + * + * @param stmt The PreparedStatement to reset. If + * null this method does nothing. + */ + private void resetStatement(PreparedStatement stmt) { + if (stmt != null) { + try { + stmt.clearParameters(); + stmt.clearWarnings(); + } catch (SQLException se) { + logException("Failed resetting PreparedStatement", se); + } + } + } + + /** + * Executes the given SQL statement with the specified parameters. + * + * @param sql statement to execute + * @param params parameters to set + * @return the Statement object that had been executed + * @throws SQLException if an error occurs + * @throws RepositoryException if the database driver could not be loaded + */ + public PreparedStatement executeStmt(Connection connection, String sql, Object[] params) + throws SQLException, RepositoryException { + return executeStmt(connection, sql, params, false, 0); + } + + /** + * Executes the given SQL statement with the specified parameters. + * + * @param sql statement to execute + * @param params parameters to set + * @param returnGeneratedKeys if the statement should return auto generated keys + * @param maxRows the maximum number of rows to return (0 for all rows) + * @return the Statement object that had been executed + * @throws SQLException if an error occurs + * @throws RepositoryException if the database driver could not be loaded + */ + public synchronized PreparedStatement executeStmt(Connection connection, + String sql, Object[] params, boolean returnGeneratedKeys, int maxRows) + throws SQLException, RepositoryException { + return executeStmtInternal(connection, sql, params, returnGeneratedKeys, maxRows); + } + + /** + * Executes the given SQL statement with the specified parameters. + * + * @param sql statement to execute + * @param params parameters to set + * @param returnGeneratedKeys if the statement should return auto generated keys + * @param maxRows the maximum number of rows to return (0 for all rows) + * @return the Statement object that had been executed + * @throws SQLException if an error occurs + * @throws RepositoryException if the database driver could not be loaded + */ + private PreparedStatement executeStmtInternal(Connection connection, + String sql, Object[] params, boolean returnGeneratedKeys, int maxRows) + throws SQLException, RepositoryException { + try { + String key = sql; + if (returnGeneratedKeys) { + key += " RETURN_GENERATED_KEYS"; + } + + ConnectionEntry entry = (ConnectionEntry) connectionEntries.get(connection); + + PreparedStatement stmt = (PreparedStatement) entry.statements.get(key); + if (stmt == null) { + if (returnGeneratedKeys) { + stmt = connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS); + } else { + stmt = connection.prepareStatement(sql); + } + entry.statements.put(key, stmt); + } + stmt.setMaxRows(maxRows); + return executeStmtInternal(connection, params, stmt); + } catch (SQLException e) { + logException("could not execute statement", e); + throw e; + } + } + + /** + * @param params the parameters for the stmt parameter + * @param stmt the statement to execute + * @return the executed Statement + * @throws SQLException on error + */ + private PreparedStatement executeStmtInternal(Connection connection, Object[] params, PreparedStatement stmt) + throws SQLException { + for (int i = 0; params != null && i < params.length; i++) { + Object p = params[i]; + if (p instanceof StreamWrapper) { + StreamWrapper wrapper = (StreamWrapper) p; + stmt.setBinaryStream(i + 1, wrapper.getStream(), (int) wrapper.getSize()); + } else if (p instanceof InputStream) { + InputStream stream = (InputStream) p; + stmt.setBinaryStream(i + 1, stream, -1); + } else { + stmt.setObject(i + 1, p); + } + } + stmt.execute(); + resetStatement(stmt); + return stmt; + } + + /** + * Logs an sql exception. + * + * @param message the message + * @param se the exception + */ + private void logException(String message, SQLException se) { + message = message == null ? "" : message; + log.error(message + ", reason: " + se.getMessage() + ", state/code: " + + se.getSQLState() + "/" + se.getErrorCode()); + log.debug(" dump:", se); + } + + private static final Logger log = LoggerFactory.getLogger(ConnectionManager.class); +} Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/ConnectionProvider.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/ConnectionProvider.java (revision 0) +++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/ConnectionProvider.java (revision 0) @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.core.persistence.bundle.util; + +import java.sql.Connection; +import java.sql.SQLException; + +import javax.jcr.RepositoryException; + + +/** + * Class responsible for providing SQL {@link Connection}s. + *

    + * Implementation of this class can do connection pooling, in which case it must + * take {@link ConnectionProperties} into account and only return connection + * that matches the specified properties. + */ +public interface ConnectionProvider { + + /** + * Returns connection with given properties. + * + * @param properties + * connection properties + * @return SQL {@link Connection} + * + * @throws RepositoryException + * @throws SQLException + */ + Connection getConnection(ConnectionProperties properties) throws RepositoryException, SQLException; + + /** + * Closes the given connection. Classes that obtain connections through + * {@link ConnectionProvider} must never call {@link Connection#close()} + * directly. They are required to call {@link #closeConnection(Connection)} + * instead. + * + * @param connection + * SQL {@link Connection} + * + * @throws SQLException + */ + void closeConnection(Connection connection) throws SQLException; + + /** + * Invoked when the repository is being shut down. + * + * @throws RepositoryException + */ + void dispose() throws RepositoryException; + + /** + * Bean that holds properties necessary to create or identify a SQL + * {@link Connection}. + *

    + * Note that this class properly implements {@link #equals(Object)} and + * {@link #hashCode()} methods so it can be used as key in maps. + */ + public static final class ConnectionProperties { + private String user; + private String password; + private String url; + private String driver; + + public String getUser() { + return user; + } + + public void setUser(String user) { + this.user = user; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public String getUrl() { + return url; + } + + public void setUrl(String url) { + this.url = url; + } + + public void setDriver(String driver) { + this.driver = driver; + } + + public String getDriver() { + return driver; + } + + private boolean equals(String s1, String s2) { + return s1 == s2 || (s1 != null && s1.equals(s2)); + } + + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof ConnectionProperties)) { + return false; + } + ConnectionProperties cp = (ConnectionProperties) obj; + return equals(user, cp.user) && equals(password, cp.password) + && equals(url, cp.url) && equals(driver, cp.driver); + + } + + private int hashCode(String s) { + return s != null ? s.hashCode() : 0; + } + + public int hashCode() { + return hashCode(user) + 37 * hashCode(password) + 373 + & hashCode(url) + 1187 * hashCode(driver); + } + } +} Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/SimplePoolingConnectionProvider.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/SimplePoolingConnectionProvider.java (revision 0) +++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/SimplePoolingConnectionProvider.java (revision 0) @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.core.persistence.bundle.util; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import javax.jcr.RepositoryException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Simple {@link ConnectionProvider} implementation that pools the database + * {@link Connection}s. This is a proof of concept implementation and should + * not be used for production. + */ +public class SimplePoolingConnectionProvider implements ConnectionProvider { + + public SimplePoolingConnectionProvider() { + + } + + public void closeConnection(Connection connection) throws SQLException { + if (connection != null) { + Pool pool = null; + synchronized (this) { + pool = (Pool) connectionToPool.remove(connection); + } + if (pool == null) { + log.warn("Trying to close connection not opened by this ConnectionManager"); + } else { + pool.returnConnection(connection); + } + } + } + + public synchronized void dispose() throws RepositoryException { + connectionToPool.clear(); + for (Iterator i = driverToPool.values().iterator(); i.hasNext();) { + Pool pool = (Pool) i.next(); + pool.dispose(); + } + connectionToPool.clear(); + } + + public Connection getConnection(ConnectionProperties properties) + throws RepositoryException, SQLException { + + if (properties == null) { + throw new IllegalArgumentException("Argument 'properties' may not be null."); + } + if (properties.getDriver() == null) { + throw new IllegalArgumentException("Database driver may not be null."); + } + + Pool pool; + synchronized (this) { + pool = (Pool) driverToPool.get(properties.getDriver()); + if (pool == null) { + pool = new Pool(); + driverToPool.put(properties.getDriver(), pool); + } + } + Connection c = pool.getConnection(properties); + synchronized (this) { + connectionToPool.put(c, pool); + } + + if (!c.getAutoCommit()) { + c.setAutoCommit(true); + } + + return c; + } + + private int minConnections = 10; + private int maxConnections = 20; + private boolean block = true; + + /** + * Sets the maximal amount of pooled connections. If there is demand for more connections, + * the manager will either block (if {@link #setBlock(boolean)} is true) or throw an exception + * (if {@link #setBlock(boolean)} is false). + * + * @param maxConnections + */ + public void setMaxConnections(int maxConnections) { + this.maxConnections = maxConnections; + } + + /** + * Returns the maximal amount of pooled connections. + * @return + */ + public int getMaxConnections() { + return maxConnections; + } + + /** + * Returns the minimal amount of pooled connections. + * @return + */ + public int getMinConnections() { + return minConnections; + } + + /** + * Sets the minimal amount of pooled connections. If connection is returned to the pool + * and there are more connections in pool than minimal amount of pooled connection, + * the last accessed avilable connection will be removed. + * @param minConnections + */ + public void setMinConnections(int minConnections) { + this.minConnections = minConnections; + } + + /** + * Sets the blocking behavior when there is no available connection in pool. + * If block is true, the requesting thread will be blocked, otherwise + * if block is false, and exception will be thrown. + * @param block + */ + public void setBlock(boolean block) { + this.block = block; + } + + public boolean isBlock() { + return block; + } + + private Map driverToPool = new HashMap(); + private Map connectionToPool = new HashMap(); + + /** + * Simple Pool implementation. + */ + private class Pool { + + private Pool() { + + } + + private final List entries = new ArrayList(); + + /** + * Entry for connection in pool + */ + private class ConnectionEntry { + + private ConnectionEntry(Connection connection, + ConnectionProperties connectionProperties, boolean available) { + this.connection = connection; + this.connectionProperties = connectionProperties; + this.available = available; + touch(); + } + + private final Connection connection; + private final ConnectionProperties connectionProperties; + private boolean available; + + public Connection getConnection() { + return connection; + } + + public ConnectionProperties getConnectionProperties() { + return connectionProperties; + } + + public void setAvailable(boolean available) { + this.available = available; + } + + public boolean isAvailable() { + return available; + } + + public void touch() { + lastAccessed = System.currentTimeMillis(); + } + + public long getLastAccessed() { + return lastAccessed; + } + + private long lastAccessed; + }; + + private Connection tryGetConnection(ConnectionProperties properties) throws RepositoryException, SQLException { + synchronized (entries) { + Iterator i = entries.iterator(); + while (i.hasNext()) { + ConnectionEntry e = (ConnectionEntry) i.next(); + if (e.isAvailable() + && e.getConnectionProperties().equals(properties)) { + e.setAvailable(false); + e.touch(); + return e.getConnection(); + } + } + + if (entries.size() >= maxConnections) { + // try remove at least one connection + removeOldestAvailableEntry(); + } + + if (entries.size() >= maxConnections) { + if (!isBlock()) { + throw new RepositoryException( + "Couldn't get any more database connections."); + } else { + return null; + } + } + + Connection connection = ConnectionFactory.getConnection( + properties.getDriver(), properties.getUrl(), properties + .getUser(), properties.getPassword()); + ConnectionEntry entry = new ConnectionEntry(connection, properties, false); + entries.add(entry); + return connection; + } + } + + private Connection getConnection(ConnectionProperties properties) throws RepositoryException, SQLException { + Connection connection = null; + while (connection == null) { + connection = tryGetConnection(properties); + if (connection == null) { + try { + final int sleep = 500; + log.info("No available connections in pool, waiting " + sleep + " ms"); + Thread.sleep(sleep); + } catch (InterruptedException ignore) { + + } + } + } + return connection; + } + + private void removeOldestAvailableEntry() { + ConnectionEntry oldest = null; + + for (Iterator i = entries.iterator(); i.hasNext();) { + ConnectionEntry entry = (ConnectionEntry) i.next(); + if (entry.isAvailable() && (oldest == null || oldest.getLastAccessed() > entry.getLastAccessed())) { + oldest = entry; + } + } + + if (oldest != null) { + entries.remove(oldest); + try { + oldest.getConnection().close(); + } catch (SQLException e) { + log.error("Error closing connection", e); + } + } + } + + private void returnConnection(Connection connection) { + try { + if (connection.isClosed()) { + log.error("Connection is closed!"); + } + connection.rollback(); + connection.setAutoCommit(true); + } catch (SQLException e) { + log.error("Error rollbacking connection", e); + } finally { + synchronized (entries) { + for (Iterator i = entries.iterator(); i.hasNext();) { + ConnectionEntry entry = (ConnectionEntry) i.next(); + if (entry.connection == connection) { + entry.setAvailable(true); + if (entries.size() > getMinConnections()) { + removeOldestAvailableEntry(); + } + return; + } + } + } + } + } + + private void dispose() { + for (Iterator i = entries.iterator(); i.hasNext();) { + ConnectionEntry entry = (ConnectionEntry) i.next(); + try { + entry.getConnection().close(); + } catch (SQLException e) { + log.error("Error closing connection ", e); + } + } + } + }; + + private static final Logger log = LoggerFactory + .getLogger(SimplePoolingConnectionProvider.class); +} Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/StreamWrapper.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/StreamWrapper.java (revision 0) +++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/StreamWrapper.java (revision 0) @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.core.persistence.bundle.util; + +import java.io.InputStream; + +public class StreamWrapper { + + private final InputStream stream; + private final long size; + + /** + * Creates a wrapper for the given InputStream that can + * safely be passed as a parameter to the executeStmt + * methods in the {@link ConnectionRecoveryManager} class. + * + * @param in the InputStream to wrap + * @param size the size of the input stream + */ + public StreamWrapper(InputStream in, long size) { + this.stream = in; + this.size = size; + } + + public InputStream getStream() { + return stream; + } + + public long getSize() { + return size; + } +} \ No newline at end of file Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/ThreadLocalConnectionProviderAdapter.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/ThreadLocalConnectionProviderAdapter.java (revision 0) +++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/ThreadLocalConnectionProviderAdapter.java (revision 0) @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.core.persistence.bundle.util; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; + +import javax.jcr.RepositoryException; + +import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap; + +/** + * Adapter for {@link ConnectionProvider} that binds connection to current thread. + * First call to {@link #getConnection(ConnectionProperties)} binds the connection to current thread. + * All subsequent calls to {@link #getConnection(ConnectionProperties)} return the same connection + * instance, until {@link #closeConnection(Connection)} is called appropriate number of times. + */ +public class ThreadLocalConnectionProviderAdapter implements ConnectionProvider { + + private final ConnectionProvider delegate; + + public ThreadLocalConnectionProviderAdapter(ConnectionProvider delegate) { + this.delegate = delegate; + } + + public void closeConnection(Connection connection) throws SQLException { + + ConnectionEntry e = (ConnectionEntry) connectionToEntry.get(connection); + if (e == null) { + throw new IllegalStateException( + "Trying to close unknown connection."); + } + + if (e.usageCount == 0) { + throw new IllegalStateException( + "Trying to close already closed connection."); + } + + --e.usageCount; + if (e.usageCount == 0) { + + // try to remove the connection from map in current thread + Map/* */map = (Map) threadLocal + .get(); + map.values().remove(e); + if (map.isEmpty()) { + threadLocal.remove(); + } + + connectionToEntry.remove(connection); + + // note that the entry can still be left in thread local in case the connection + // is closed from other thread. however it shouldn't cause any problems since + // the only thing left is entry and connectionproperties and the entry has usageCount + // set to 0 + e.connection = null; + + delegate.closeConnection(connection); + } + + } + + public void dispose() throws RepositoryException { + delegate.dispose(); + } + + private Map/* Connection, ConnectionEntry> */connectionToEntry = new ConcurrentHashMap(); + + private ThreadLocal/* > */threadLocal = new ThreadLocal(); + + /** + * Entry for connection. + */ + private static class ConnectionEntry { + private ConnectionEntry() { + } + private Connection connection; + private int usageCount; + }; + + public Connection getConnection(ConnectionProperties properties) + throws RepositoryException, SQLException { + + Map/* */map = (Map) threadLocal.get(); + if (map != null) { + ConnectionEntry e = (ConnectionEntry) map.get(properties); + + // reuse the connection when threadBound is true or when connection + // is not in auto commit mode + if (e != null && e.connection != null) { + ++e.usageCount; + return e.connection; + } + } + + ConnectionEntry e = new ConnectionEntry(); + e.usageCount = 1; + e.connection = delegate.getConnection(properties); + + if (map == null) { + map = new HashMap(); + threadLocal.set(map); + } + + map.put(properties, e); + connectionToEntry.put(e.connection, e); + + return e.connection; + } + +} Index: src/main/java/org/apache/jackrabbit/core/persistence/db/DatabasePersistenceManager.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/persistence/db/DatabasePersistenceManager.java (revision 676201) +++ src/main/java/org/apache/jackrabbit/core/persistence/db/DatabasePersistenceManager.java (working copy) @@ -20,6 +20,7 @@ import org.apache.jackrabbit.core.NodeId; import org.apache.jackrabbit.core.PropertyId; import org.apache.jackrabbit.core.fs.FileSystem; +import org.apache.jackrabbit.core.fs.FileSystemContext; import org.apache.jackrabbit.core.fs.local.LocalFileSystem; import org.apache.jackrabbit.core.persistence.AbstractPersistenceManager; import org.apache.jackrabbit.core.persistence.PMContext; @@ -223,7 +224,7 @@ */ LocalFileSystem blobFS = new LocalFileSystem(); blobFS.setRoot(new File(context.getHomeDir(), "blobs")); - blobFS.init(); + blobFS.init(new FileSystemContext(null)); this.blobFS = blobFS; blobStore = new FileSystemBLOBStore(blobFS); } else { Index: src/main/java/org/apache/jackrabbit/core/persistence/mem/InMemPersistenceManager.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/persistence/mem/InMemPersistenceManager.java (revision 676201) +++ src/main/java/org/apache/jackrabbit/core/persistence/mem/InMemPersistenceManager.java (working copy) @@ -20,6 +20,7 @@ import org.apache.jackrabbit.core.NodeId; import org.apache.jackrabbit.core.PropertyId; import org.apache.jackrabbit.core.fs.FileSystem; +import org.apache.jackrabbit.core.fs.FileSystemContext; import org.apache.jackrabbit.core.fs.FileSystemPathUtil; import org.apache.jackrabbit.core.fs.FileSystemResource; import org.apache.jackrabbit.core.fs.local.LocalFileSystem; @@ -304,7 +305,7 @@ */ LocalFileSystem blobFS = new LocalFileSystem(); blobFS.setRoot(new File(context.getHomeDir(), "blobs")); - blobFS.init(); + blobFS.init(new FileSystemContext(null)); this.blobFS = blobFS; blobStore = new FileSystemBLOBStore(blobFS); Index: src/main/java/org/apache/jackrabbit/core/persistence/obj/ObjectPersistenceManager.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/persistence/obj/ObjectPersistenceManager.java (revision 676201) +++ src/main/java/org/apache/jackrabbit/core/persistence/obj/ObjectPersistenceManager.java (working copy) @@ -20,6 +20,7 @@ import org.apache.jackrabbit.core.PropertyId; import org.apache.jackrabbit.core.fs.BasedFileSystem; import org.apache.jackrabbit.core.fs.FileSystem; +import org.apache.jackrabbit.core.fs.FileSystemContext; import org.apache.jackrabbit.core.fs.FileSystemException; import org.apache.jackrabbit.core.fs.FileSystemResource; import org.apache.jackrabbit.core.fs.local.LocalFileSystem; @@ -148,7 +149,7 @@ */ LocalFileSystem blobFS = new LocalFileSystem(); blobFS.setRoot(new File(context.getHomeDir(), "blobs")); - blobFS.init(); + blobFS.init(new FileSystemContext(context.getConnectionProvider())); this.blobFS = blobFS; blobStore = new FileSystemBLOBStore(blobFS); Index: src/main/java/org/apache/jackrabbit/core/persistence/PMContext.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/persistence/PMContext.java (revision 676201) +++ src/main/java/org/apache/jackrabbit/core/persistence/PMContext.java (working copy) @@ -16,13 +16,15 @@ */ package org.apache.jackrabbit.core.persistence; +import java.io.File; + +import javax.jcr.NamespaceRegistry; + +import org.apache.jackrabbit.core.NodeId; import org.apache.jackrabbit.core.data.DataStore; import org.apache.jackrabbit.core.fs.FileSystem; import org.apache.jackrabbit.core.nodetype.NodeTypeRegistry; -import org.apache.jackrabbit.core.NodeId; - -import javax.jcr.NamespaceRegistry; -import java.io.File; +import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionProvider; /** * A PMContext is used to provide context information for a @@ -62,6 +64,8 @@ */ private final DataStore dataStore; + private final ConnectionProvider connectionProvider; + /** * Creates a new PMContext. * @@ -76,7 +80,8 @@ NodeId rootNodeId, NamespaceRegistry nsReg, NodeTypeRegistry ntReg, - DataStore dataStore) { + DataStore dataStore, + ConnectionProvider connectionProvider) { this.physicalHomeDir = homeDir; this.fs = fs; this.rootNodeId = rootNodeId; @@ -83,6 +88,7 @@ this.nsReg = nsReg; this.ntReg = ntReg; this.dataStore = dataStore; + this.connectionProvider = connectionProvider; } @@ -136,4 +142,8 @@ public DataStore getDataStore() { return dataStore; } + + public ConnectionProvider getConnectionProvider() { + return connectionProvider; + } } Index: src/main/java/org/apache/jackrabbit/core/persistence/xml/XMLPersistenceManager.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/persistence/xml/XMLPersistenceManager.java (revision 683292) +++ src/main/java/org/apache/jackrabbit/core/persistence/xml/XMLPersistenceManager.java (working copy) @@ -21,6 +21,7 @@ import org.apache.jackrabbit.core.PropertyId; import org.apache.jackrabbit.core.fs.BasedFileSystem; import org.apache.jackrabbit.core.fs.FileSystem; +import org.apache.jackrabbit.core.fs.FileSystemContext; import org.apache.jackrabbit.core.fs.FileSystemException; import org.apache.jackrabbit.core.fs.FileSystemResource; import org.apache.jackrabbit.core.fs.local.LocalFileSystem; @@ -428,7 +429,7 @@ */ LocalFileSystem blobFS = new LocalFileSystem(); blobFS.setRoot(new File(context.getHomeDir(), "blobs")); - blobFS.init(); + blobFS.init(new FileSystemContext(context.getConnectionProvider())); this.blobFS = blobFS; blobStore = new FileSystemBLOBStore(blobFS); Index: src/main/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java (revision 683292) +++ src/main/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java (working copy) @@ -16,9 +16,26 @@ */ package org.apache.jackrabbit.core.query.lucene; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.jcr.RepositoryException; + +import org.apache.commons.collections.iterators.EmptyIterator; import org.apache.jackrabbit.core.NodeId; +import org.apache.jackrabbit.core.fs.FileSystemContext; import org.apache.jackrabbit.core.fs.FileSystemException; import org.apache.jackrabbit.core.fs.local.LocalFileSystem; +import org.apache.jackrabbit.core.state.ChildNodeEntry; import org.apache.jackrabbit.core.state.ItemStateException; import org.apache.jackrabbit.core.state.ItemStateManager; import org.apache.jackrabbit.core.state.NoSuchItemStateException; @@ -23,32 +40,17 @@ import org.apache.jackrabbit.core.state.ItemStateManager; import org.apache.jackrabbit.core.state.NoSuchItemStateException; import org.apache.jackrabbit.core.state.NodeState; -import org.apache.jackrabbit.core.state.ChildNodeEntry; -import org.apache.jackrabbit.uuid.Constants; -import org.apache.jackrabbit.uuid.UUID; -import org.apache.jackrabbit.util.Timer; import org.apache.jackrabbit.spi.Path; import org.apache.jackrabbit.spi.PathFactory; import org.apache.jackrabbit.spi.commons.name.PathFactoryImpl; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.jackrabbit.util.Timer; +import org.apache.jackrabbit.uuid.Constants; +import org.apache.jackrabbit.uuid.UUID; import org.apache.lucene.document.Document; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.Term; -import org.apache.commons.collections.iterators.EmptyIterator; - -import javax.jcr.RepositoryException; -import java.io.IOException; -import java.io.File; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Arrays; -import java.util.Set; -import java.util.HashSet; -import java.util.HashMap; -import java.util.Map; -import java.util.Collection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A MultiIndex consists of a {@link VolatileIndex} and multiple @@ -261,7 +263,7 @@ try { LocalFileSystem fs = new LocalFileSystem(); fs.setRoot(indexDir); - fs.init(); + fs.init(new FileSystemContext(null)); store = new IndexingQueueStore(fs, INDEXING_QUEUE_FILE); } catch (FileSystemException e) { throw Util.createIOException(e); Index: src/main/java/org/apache/jackrabbit/core/query/lucene/SearchIndex.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/query/lucene/SearchIndex.java (revision 676201) +++ src/main/java/org/apache/jackrabbit/core/query/lucene/SearchIndex.java (working copy) @@ -21,6 +21,7 @@ import org.apache.jackrabbit.core.NodeId; import org.apache.jackrabbit.core.NodeIdIterator; import org.apache.jackrabbit.core.fs.FileSystem; +import org.apache.jackrabbit.core.fs.FileSystemContext; import org.apache.jackrabbit.core.fs.FileSystemResource; import org.apache.jackrabbit.core.fs.FileSystemException; import org.apache.jackrabbit.core.fs.local.LocalFileSystem; @@ -491,8 +492,8 @@ log.info("Index initialized: {} Version: {}", new Object[]{path, index.getIndexFormatVersion()}); if (!index.getIndexFormatVersion().equals(getIndexFormatVersion())) { - log.warn("Using Version {} for reading. Please re-index version " + - "storage for optimal performance.", + log.warn("Using Version {} for reading. Please re-index version " + + "storage for optimal performance.", new Integer(getIndexFormatVersion().getVersion())); } } @@ -1010,7 +1011,7 @@ File root = new File(path, synonymProviderConfigPath.substring(0, lastSeparator)); ((LocalFileSystem) fs).setRoot(root.getCanonicalFile()); - fs.init(); + fs.init(new FileSystemContext(null)); fsr = new FileSystemResource(fs, synonymProviderConfigPath.substring(lastSeparator + 1)); } else { @@ -1015,7 +1016,7 @@ synonymProviderConfigPath.substring(lastSeparator + 1)); } else { ((LocalFileSystem) fs).setPath(path); - fs.init(); + fs.init(new FileSystemContext(null)); fsr = new FileSystemResource(fs, synonymProviderConfigPath); } synonymProviderConfigFs = fs; Index: src/main/java/org/apache/jackrabbit/core/RepositoryImpl.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/RepositoryImpl.java (revision 681176) +++ src/main/java/org/apache/jackrabbit/core/RepositoryImpl.java (working copy) @@ -16,10 +16,35 @@ */ package org.apache.jackrabbit.core; -import EDU.oswego.cs.dl.util.concurrent.Mutex; -import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock; -import EDU.oswego.cs.dl.util.concurrent.ReentrantWriterPreferenceReadWriteLock; -import EDU.oswego.cs.dl.util.concurrent.WriterPreferenceReadWriteLock; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.security.AccessControlContext; +import java.security.AccessController; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; +import java.util.Set; + +import javax.jcr.AccessDeniedException; +import javax.jcr.Credentials; +import javax.jcr.LoginException; +import javax.jcr.NamespaceRegistry; +import javax.jcr.NoSuchWorkspaceException; +import javax.jcr.RepositoryException; +import javax.jcr.Session; +import javax.jcr.observation.Event; +import javax.jcr.observation.EventIterator; +import javax.jcr.observation.EventListener; +import javax.jcr.observation.ObservationManager; +import javax.security.auth.Subject; + import org.apache.commons.collections.map.ReferenceMap; import org.apache.commons.io.IOUtils; import org.apache.jackrabbit.api.JackrabbitRepository; @@ -39,8 +64,10 @@ import org.apache.jackrabbit.core.config.VersioningConfig; import org.apache.jackrabbit.core.config.WorkspaceConfig; import org.apache.jackrabbit.core.data.DataStore; +import org.apache.jackrabbit.core.data.DataStoreContext; import org.apache.jackrabbit.core.fs.BasedFileSystem; import org.apache.jackrabbit.core.fs.FileSystem; +import org.apache.jackrabbit.core.fs.FileSystemContext; import org.apache.jackrabbit.core.fs.FileSystemException; import org.apache.jackrabbit.core.fs.FileSystemResource; import org.apache.jackrabbit.core.lock.LockManager; @@ -52,6 +79,7 @@ import org.apache.jackrabbit.core.observation.ObservationDispatcher; import org.apache.jackrabbit.core.persistence.PMContext; import org.apache.jackrabbit.core.persistence.PersistenceManager; +import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionProvider; import org.apache.jackrabbit.core.security.JackrabbitSecurityManager; import org.apache.jackrabbit.core.security.authentication.AuthContext; import org.apache.jackrabbit.core.state.CacheManager; @@ -72,33 +100,10 @@ import org.slf4j.LoggerFactory; import org.xml.sax.InputSource; -import javax.jcr.AccessDeniedException; -import javax.jcr.Credentials; -import javax.jcr.LoginException; -import javax.jcr.NamespaceRegistry; -import javax.jcr.NoSuchWorkspaceException; -import javax.jcr.RepositoryException; -import javax.jcr.Session; -import javax.jcr.observation.Event; -import javax.jcr.observation.EventIterator; -import javax.jcr.observation.EventListener; -import javax.jcr.observation.ObservationManager; -import javax.security.auth.Subject; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.OutputStream; -import java.io.OutputStreamWriter; -import java.security.AccessControlContext; -import java.security.AccessController; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Properties; -import java.util.Set; +import EDU.oswego.cs.dl.util.concurrent.Mutex; +import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock; +import EDU.oswego.cs.dl.util.concurrent.ReentrantWriterPreferenceReadWriteLock; +import EDU.oswego.cs.dl.util.concurrent.WriterPreferenceReadWriteLock; /** * A RepositoryImpl ... @@ -248,7 +253,7 @@ repLock.acquire(); // setup file systems - repStore = repConfig.getFileSystemConfig().createFileSystem(); + repStore = repConfig.getFileSystemConfig().createFileSystem(new FileSystemContext(getConnectionProvider())); String fsRootPath = "/meta"; try { if (!repStore.exists(fsRootPath) || !repStore.isFolder(fsRootPath)) { @@ -423,7 +428,7 @@ throws RepositoryException { - FileSystem fs = vConfig.getFileSystemConfig().createFileSystem(); + FileSystem fs = vConfig.getFileSystemConfig().createFileSystem(new FileSystemContext(getConnectionProvider())); PersistenceManager pm = createPersistenceManager(vConfig.getHomeDir(), fs, vConfig.getPersistenceManagerConfig(), @@ -430,7 +435,8 @@ rootNodeId, nsReg, ntReg, - dataStore); + dataStore, + getConnectionProvider()); ISMLocking ismLocking = vConfig.getISMLockingConfig().createISMLocking(); @@ -658,7 +664,7 @@ SystemSession defSysSession = getSystemSession(wspName); systemSearchMgr = new SearchManager(repConfig.getSearchConfig(), nsReg, ntReg, defSysSession.getItemStateManager(), - vMgr.getPersistenceManager(), SYSTEM_ROOT_NODE_ID, null, null); + vMgr.getPersistenceManager(), SYSTEM_ROOT_NODE_ID, null, null, getConnectionProvider()); ObservationManager obsMgr = defSysSession.getWorkspace().getObservationManager(); obsMgr.addEventListener(systemSearchMgr, Event.NODE_ADDED | Event.NODE_REMOVED | Event.PROPERTY_ADDED @@ -693,7 +699,7 @@ protected DataStore createDataStore() throws RepositoryException { DataStoreConfig dsc = repConfig.getDataStoreConfig(); DataStore dataStore = (DataStore) dsc.newInstance(); - dataStore.init(repConfig.getHomeDir()); + dataStore.init(new DataStoreContext(repConfig.getHomeDir(), getConnectionProvider())); return dataStore; } @@ -1136,7 +1142,9 @@ // finally release repository lock repLock.release(); - + + getConfig().dispose(); + log.info("Repository has been shutdown"); } @@ -1272,11 +1280,12 @@ NodeId rootNodeId, NamespaceRegistry nsReg, NodeTypeRegistry ntReg, - DataStore dataStore) + DataStore dataStore, + ConnectionProvider connectionProvider) throws RepositoryException { try { PersistenceManager pm = (PersistenceManager) pmConfig.newInstance(); - pm.init(new PMContext(homeDir, fs, rootNodeId, nsReg, ntReg, dataStore)); + pm.init(new PMContext(homeDir, fs, rootNodeId, nsReg, ntReg, dataStore, connectionProvider)); return pm; } catch (Exception e) { String msg = "Cannot instantiate persistence manager " + pmConfig.getClassName(); @@ -1718,7 +1727,8 @@ persistMgr, rootNodeId, getSystemSearchManager(getName()), - SYSTEM_ROOT_NODE_ID); + SYSTEM_ROOT_NODE_ID, + getConnectionProvider()); } return searchMgr; } @@ -1837,7 +1847,7 @@ */ protected void doInitialize() throws RepositoryException { FileSystemConfig fsConfig = config.getFileSystemConfig(); - fs = fsConfig.createFileSystem(); + fs = fsConfig.createFileSystem(new FileSystemContext(getConnectionProvider())); persistMgr = createPersistenceManager(new File(config.getHomeDir()), fs, @@ -1845,7 +1855,8 @@ rootNodeId, nsReg, ntReg, - dataStore); + dataStore, + getConnectionProvider()); ISMLocking ismLocking = config.getISMLockingConfig().createISMLocking(); @@ -2177,4 +2188,14 @@ return RepositoryImpl.this.getDataStore(); } } + + /** + * Returns the {@link ConnectionProvider} for this repository + * + * @return + * @throws RepositoryException + */ + public ConnectionProvider getConnectionProvider() throws RepositoryException { + return getConfig().getConnectionProvider(); + } } Index: src/main/java/org/apache/jackrabbit/core/SearchManager.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/SearchManager.java (revision 681176) +++ src/main/java/org/apache/jackrabbit/core/SearchManager.java (working copy) @@ -18,6 +18,7 @@ import org.apache.jackrabbit.core.config.SearchConfig; import org.apache.jackrabbit.core.fs.FileSystem; +import org.apache.jackrabbit.core.fs.FileSystemContext; import org.apache.jackrabbit.core.fs.FileSystemException; import org.apache.jackrabbit.core.nodetype.NodeTypeRegistry; import org.apache.jackrabbit.core.observation.EventImpl; @@ -32,6 +33,7 @@ import org.apache.jackrabbit.core.state.NodeState; import org.apache.jackrabbit.core.state.NodeStateIterator; import org.apache.jackrabbit.core.persistence.PersistenceManager; +import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionProvider; import org.apache.jackrabbit.spi.commons.conversion.MalformedPathException; import org.apache.jackrabbit.spi.commons.query.jsr283.qom.QueryObjectModel; import org.apache.jackrabbit.spi.commons.query.qom.QueryObjectModelTree; @@ -177,6 +179,7 @@ * @param excludedNodeId id of the node that should be excluded from * indexing. Any descendant of that node will also be * excluded from indexing. + * @param connectionProvider * @throws RepositoryException if the search manager cannot be initialized */ public SearchManager(SearchConfig config, @@ -186,9 +189,10 @@ PersistenceManager pm, NodeId rootNodeId, SearchManager parentMgr, - NodeId excludedNodeId) throws RepositoryException { + NodeId excludedNodeId, + ConnectionProvider connectionProvider) throws RepositoryException { if (config.getFileSystemConfig() != null) { - fs = config.getFileSystemConfig().createFileSystem(); + fs = config.getFileSystemConfig().createFileSystem(new FileSystemContext(connectionProvider)); } else { fs = null; } Index: src/main/java/org/apache/jackrabbit/core/state/PMContext.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/state/PMContext.java (revision 683292) +++ src/main/java/org/apache/jackrabbit/core/state/PMContext.java (working copy) @@ -16,6 +16,10 @@ */ package org.apache.jackrabbit.core.state; +import java.io.File; + +import javax.jcr.NamespaceRegistry; + import org.apache.jackrabbit.core.NodeId; import org.apache.jackrabbit.core.data.DataStore; import org.apache.jackrabbit.core.fs.FileSystem; @@ -20,9 +24,7 @@ import org.apache.jackrabbit.core.data.DataStore; import org.apache.jackrabbit.core.fs.FileSystem; import org.apache.jackrabbit.core.nodetype.NodeTypeRegistry; - -import javax.jcr.NamespaceRegistry; -import java.io.File; +import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionProvider; /** * Legacy class kept for backward compatibility reasons. @@ -32,7 +34,8 @@ public class PMContext extends org.apache.jackrabbit.core.persistence.PMContext { public PMContext(File homeDir, FileSystem fs, NodeId rootNodeId, - NamespaceRegistry nsReg, NodeTypeRegistry ntReg, DataStore dataStore) { - super(homeDir, fs, rootNodeId, nsReg, ntReg, dataStore); + NamespaceRegistry nsReg, NodeTypeRegistry ntReg, + DataStore dataStore, ConnectionProvider connectionProvider) { + super(homeDir, fs, rootNodeId, nsReg, ntReg, dataStore, connectionProvider); } } Index: src/test/java/org/apache/jackrabbit/core/fs/AbstractFileSystemTest.java =================================================================== --- src/test/java/org/apache/jackrabbit/core/fs/AbstractFileSystemTest.java (revision 681176) +++ src/test/java/org/apache/jackrabbit/core/fs/AbstractFileSystemTest.java (working copy) @@ -25,6 +25,9 @@ import org.apache.jackrabbit.core.fs.FileSystem; import org.apache.jackrabbit.core.fs.FileSystemException; +import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionProvider; +import org.apache.jackrabbit.core.persistence.bundle.util.SimplePoolingConnectionProvider; +import org.apache.jackrabbit.core.persistence.bundle.util.ThreadLocalConnectionProviderAdapter; public abstract class AbstractFileSystemTest extends TestCase { @@ -34,10 +37,15 @@ protected abstract FileSystem getFileSystem(); protected void setUp() throws Exception { - fs = getFileSystem(); - fs.init(); + fs = getFileSystem(); + fs.init(newFileSystemContext()); } + protected FileSystemContext newFileSystemContext() { + ConnectionProvider provider = new ThreadLocalConnectionProviderAdapter(new SimplePoolingConnectionProvider()); + return new FileSystemContext(provider); + } + protected void tearDown() throws Exception { fs.close(); }