Index: src/main/java/org/apache/jackrabbit/core/config/FileSystemConfig.java
===================================================================
--- src/main/java/org/apache/jackrabbit/core/config/FileSystemConfig.java (revision 676201)
+++ src/main/java/org/apache/jackrabbit/core/config/FileSystemConfig.java (working copy)
@@ -17,6 +17,7 @@
package org.apache.jackrabbit.core.config;
import org.apache.jackrabbit.core.fs.FileSystem;
+import org.apache.jackrabbit.core.fs.FileSystemContext;
import org.apache.jackrabbit.core.fs.FileSystemException;
/**
@@ -41,10 +42,10 @@
* @return new initialized file system instance.
* @throws ConfigurationException on file system initialization errors
*/
- public FileSystem createFileSystem() throws ConfigurationException {
+ public FileSystem createFileSystem(FileSystemContext context) throws ConfigurationException {
try {
FileSystem fs = (FileSystem) newInstance();
- fs.init();
+ fs.init(context);
return fs;
} catch (ClassCastException e) {
throw new ConfigurationException(
Index: src/main/java/org/apache/jackrabbit/core/config/RepositoryConfig.java
===================================================================
--- src/main/java/org/apache/jackrabbit/core/config/RepositoryConfig.java (revision 676201)
+++ src/main/java/org/apache/jackrabbit/core/config/RepositoryConfig.java (working copy)
@@ -16,22 +16,6 @@
*/
package org.apache.jackrabbit.core.config;
-import org.apache.commons.io.IOUtils;
-import org.apache.jackrabbit.core.fs.FileSystem;
-import org.apache.jackrabbit.core.fs.FileSystemException;
-import org.apache.jackrabbit.core.fs.FileSystemPathUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.w3c.dom.Element;
-import org.xml.sax.InputSource;
-
-import javax.xml.transform.OutputKeys;
-import javax.xml.transform.Transformer;
-import javax.xml.transform.TransformerConfigurationException;
-import javax.xml.transform.TransformerException;
-import javax.xml.transform.TransformerFactory;
-import javax.xml.transform.dom.DOMSource;
-import javax.xml.transform.stream.StreamResult;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
@@ -48,6 +32,27 @@
import java.util.Map;
import java.util.Properties;
+import javax.jcr.RepositoryException;
+import javax.xml.transform.OutputKeys;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerConfigurationException;
+import javax.xml.transform.TransformerException;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.jackrabbit.core.fs.FileSystem;
+import org.apache.jackrabbit.core.fs.FileSystemContext;
+import org.apache.jackrabbit.core.fs.FileSystemException;
+import org.apache.jackrabbit.core.fs.FileSystemPathUtil;
+import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionProvider;
+import org.apache.jackrabbit.core.persistence.bundle.util.SimplePoolingConnectionProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Element;
+import org.xml.sax.InputSource;
+
/**
* Repository configuration. This configuration class is used to
* create configured repository objects.
@@ -224,6 +229,11 @@
private final DataStoreConfig dataStoreConfig;
/**
+ * Optional connection provider class.
+ */
+ private final ConnectionProviderConfig connectionProviderConfig;
+
+ /**
* Creates a repository configuration object.
*
* @param home repository home directory
@@ -244,7 +254,8 @@
String workspaceDirectory, String workspaceConfigDirectory,
String defaultWorkspace, int workspaceMaxIdleTime,
Element template, VersioningConfig vc, SearchConfig sc,
- ClusterConfig cc, DataStoreConfig dataStoreConfig, RepositoryConfigurationParser parser) {
+ ClusterConfig cc, DataStoreConfig dataStoreConfig,
+ ConnectionProviderConfig connectionProviderConfig, RepositoryConfigurationParser parser) {
workspaces = new HashMap();
this.home = home;
this.sec = sec;
@@ -258,6 +269,7 @@
this.sc = sc;
this.cc = cc;
this.dataStoreConfig = dataStoreConfig;
+ this.connectionProviderConfig = connectionProviderConfig;
this.parser = parser;
}
@@ -286,7 +298,7 @@
// a configuration directoy had been specified; search for
// workspace configurations in virtual repository file system
// rather than in physical workspace root directory on disk
- FileSystem fs = fsc.createFileSystem();
+ FileSystem fs = fsc.createFileSystem(new FileSystemContext(getConnectionProvider()));
try {
if (!fs.exists(workspaceConfigDirectory)) {
fs.createFolder(workspaceConfigDirectory);
@@ -482,7 +494,7 @@
// a configuration directoy had been specified;
// workspace configurations are maintained in
// virtual repository file system
- virtualFS = fsc.createFileSystem();
+ virtualFS = fsc.createFileSystem(new FileSystemContext(getConnectionProvider()));
} else {
// workspace configurations are maintained on disk
virtualFS = null;
@@ -753,6 +765,65 @@
public DataStoreConfig getDataStoreConfig() {
return dataStoreConfig;
}
+
+ /**
+ * Returns configured {@link ConnectionProviderConfig}
+ *
+ * @return {@link ConnectionProviderConfig} instance or null
+ * if no provider has been configured.
+ */
+ public ConnectionProviderConfig getConnectionProviderConfig() {
+ return connectionProviderConfig;
+ }
+
+ private ConnectionProvider connectionProvider;
+
+ protected ConnectionProvider createDefaultConnectionProvider() {
+ return new SimplePoolingConnectionProvider();
+ }
+
+ /**
+ * Creates a new connection provider. If there is a ConnectionProvider element
+ * in the configuration file it will be used. Otherwise default {@link ConnectionProvider}
+ * will be created.
+ *
+ * @return
+ * @throws ConfigurationException
+ * @throws RepositoryException
+ */
+ protected ConnectionProvider createConnectionProvider() throws ConfigurationException {
+ ConnectionProviderConfig config = getConnectionProviderConfig();
+ ConnectionProvider provider;
+ if (config != null) {
+ provider = config.createConnectionProvider();
+ } else {
+ provider = createDefaultConnectionProvider();
+ }
+ return provider;
+ }
+
+ /**
+ * Returns the {@link ConnectionProvider} for this repository
+ *
+ * @return
+ * @throws RepositoryException
+ */
+ public ConnectionProvider getConnectionProvider() throws ConfigurationException {
+ if (connectionProvider == null) {
+ connectionProvider = createConnectionProvider();
+ }
+ return connectionProvider;
+ };
+
+ public void dispose() {
+ if (connectionProvider != null) {
+ try {
+ connectionProvider.dispose();
+ } catch (Exception e) {
+ log.error("Error while disposing ConnectionProvider.", e);
+ }
+ }
+ }
}
Index: src/main/java/org/apache/jackrabbit/core/config/RepositoryConfigurationParser.java
===================================================================
--- src/main/java/org/apache/jackrabbit/core/config/RepositoryConfigurationParser.java (revision 676201)
+++ src/main/java/org/apache/jackrabbit/core/config/RepositoryConfigurationParser.java (working copy)
@@ -16,6 +16,9 @@
*/
package org.apache.jackrabbit.core.config;
+import java.io.File;
+import java.util.Properties;
+
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
@@ -21,9 +24,6 @@
import org.w3c.dom.NodeList;
import org.xml.sax.InputSource;
-import java.io.File;
-import java.util.Properties;
-
/**
* Configuration parser. This class is used to parse the repository and
* workspace configuration files.
@@ -146,6 +146,12 @@
private static final String AC_PROVIDER_ELEMENT = "AccessControlProvider";
/**
+ * Name of optional ConnectionProvider element for configuring custom
+ * ConnectionProvider implementations.
+ */
+ private static final String CONNECTION_PROVIDER_ELEMENT = "ConnectionProvider";
+
+ /**
* Creates a new configuration parser with the given parser variables.
*
* @param variables parser variables
@@ -248,9 +254,15 @@
// Optional data store configuration
DataStoreConfig dsc = parseDataStoreConfig(root);
+ Element connectionProviderElement = getElement(root, CONNECTION_PROVIDER_ELEMENT, false);
+ ConnectionProviderConfig cpc = null;
+ if (connectionProviderElement != null) {
+ cpc = new ConnectionProviderConfig(parseBeanConfig(root, CONNECTION_PROVIDER_ELEMENT));
+ }
+
return new RepositoryConfig(home, securityConfig, fsc,
workspaceDirectory, workspaceConfigDirectory, defaultWorkspace,
- maxIdleTime, template, vc, sc, cc, dsc, this);
+ maxIdleTime, template, vc, sc, cc, dsc, cpc, this);
}
/**
@@ -507,7 +519,7 @@
Element element = getElement(parent, WSP_SECURITY_ELEMENT, false);
if (element != null) {
Element provFact = getElement(element, AC_PROVIDER_ELEMENT, false);
- if (provFact !=null ) {
+ if (provFact != null) {
factConf = parseBeanConfig(element, AC_PROVIDER_ELEMENT);
}
}
Index: src/main/java/org/apache/jackrabbit/core/config/ConnectionProviderConfig.java
===================================================================
--- src/main/java/org/apache/jackrabbit/core/config/ConnectionProviderConfig.java (revision 0)
+++ src/main/java/org/apache/jackrabbit/core/config/ConnectionProviderConfig.java (revision 0)
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.core.config;
+
+import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionProvider;
+
+/**
+ * BeanConfig for {@link ConnectionProvider} configuration.
+ */
+public class ConnectionProviderConfig extends BeanConfig {
+
+ public ConnectionProviderConfig(BeanConfig config) {
+ super(config);
+ }
+
+ /**
+ * Instantiates and initializes the configured connection provider
+ * implementation class.
+ *
+ * @return new initialized connection provider instance.
+ * @throws ConfigurationException on file system initialization errors
+ */
+ public ConnectionProvider createConnectionProvider() throws ConfigurationException {
+ try {
+ ConnectionProvider cp = (ConnectionProvider) newInstance();
+ return cp;
+ } catch (ClassCastException e) {
+ throw new ConfigurationException(
+ "Invalid file system implementation class "
+ + getClassName() + ".", e);
+ }
+ }
+}
Index: src/main/java/org/apache/jackrabbit/core/data/DataStore.java
===================================================================
--- src/main/java/org/apache/jackrabbit/core/data/DataStore.java (revision 676201)
+++ src/main/java/org/apache/jackrabbit/core/data/DataStore.java (working copy)
@@ -102,10 +102,10 @@
/**
* Initialized the data store
*
- * @param homeDir the home directory of the repository
+ * @param context DataStoreContext
* @throws RepositoryException
*/
- void init(String homeDir) throws RepositoryException;
+ void init(DataStoreContext context) throws RepositoryException;
/**
* Get the minimum size of an object that should be stored in this data store.
Index: src/main/java/org/apache/jackrabbit/core/data/db/DbDataStore.java
===================================================================
--- src/main/java/org/apache/jackrabbit/core/data/db/DbDataStore.java (revision 678172)
+++ src/main/java/org/apache/jackrabbit/core/data/db/DbDataStore.java (working copy)
@@ -16,18 +16,6 @@
*/
package org.apache.jackrabbit.core.data.db;
-import org.apache.jackrabbit.core.data.DataIdentifier;
-import org.apache.jackrabbit.core.data.DataRecord;
-import org.apache.jackrabbit.core.data.DataStore;
-import org.apache.jackrabbit.core.data.DataStoreException;
-import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionRecoveryManager;
-import org.apache.jackrabbit.core.persistence.bundle.util.TrackingInputStream;
-import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionRecoveryManager.StreamWrapper;
-import org.apache.jackrabbit.util.Text;
-import org.apache.jackrabbit.uuid.UUID;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.File;
@@ -37,9 +25,11 @@
import java.security.DigestInputStream;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
+import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
+import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
@@ -49,6 +39,21 @@
import javax.jcr.RepositoryException;
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataRecord;
+import org.apache.jackrabbit.core.data.DataStore;
+import org.apache.jackrabbit.core.data.DataStoreContext;
+import org.apache.jackrabbit.core.data.DataStoreException;
+import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionManager;
+import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionRecoveryManager;
+import org.apache.jackrabbit.core.persistence.bundle.util.StreamWrapper;
+import org.apache.jackrabbit.core.persistence.bundle.util.TrackingInputStream;
+import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionProvider.ConnectionProperties;
+import org.apache.jackrabbit.util.Text;
+import org.apache.jackrabbit.uuid.UUID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* A data store implementation that stores the records in a database using JDBC.
*
@@ -61,7 +66,6 @@
* <param name="{@link #setDatabaseType(String) databaseType}" value="postgresql"/>
* <param name="{@link #setDriver(String) driver}" value="org.postgresql.Driver"/>
* <param name="{@link #setMinRecordLength(int) minRecordLength}" value="1024"/>
- * <param name="{@link #setMaxConnections(int) maxConnections}" value="2"/>
* <param name="{@link #setCopyWhenReading(int) copyWhenReading}" value="true"/>
* <param name="{@link #setTablePrefix(int) tablePrefix}" value=""/>
* </DataStore>
@@ -149,14 +153,9 @@
protected int minRecordLength = DEFAULT_MIN_RECORD_LENGTH;
/**
- * The maximum number of open connections.
- */
- protected int maxConnections = DEFAULT_MAX_CONNECTIONS;
-
- /**
- * A list of connections
+ * Connection Manager
*/
- protected Pool connectionPool;
+ protected ConnectionManager connectionManager;
/**
* The prefix used for temporary objects.
@@ -283,9 +282,10 @@
public DataRecord addRecord(InputStream stream) throws DataStoreException {
ResultSet rs = null;
TempFileInputStream fileInput = null;
- ConnectionRecoveryManager conn = getConnection();
+ Connection connection = null;
try {
- conn.setAutoReconnect(false);
+ connection = getConnection();
+
String id = null, tempId = null;
long now;
for (int i = 0; i < ConnectionRecoveryManager.TRIALS; i++) {
@@ -293,7 +293,7 @@
now = System.currentTimeMillis();
id = UUID.randomUUID().toString();
tempId = TEMP_PREFIX + id;
- PreparedStatement prep = conn.executeStmt(selectMetaSQL, new Object[]{tempId});
+ PreparedStatement prep = connectionManager.executeStmt(connection, selectMetaSQL, new Object[]{tempId});
rs = prep.getResultSet();
if (rs.next()) {
// re-try in the very, very unlikely event that the row already exists
@@ -299,7 +299,7 @@
// re-try in the very, very unlikely event that the row already exists
continue;
}
- conn.executeStmt(insertTempSQL, new Object[]{tempId, new Long(now)});
+ connectionManager.executeStmt(connection, insertTempSQL, new Object[]{tempId, new Long(now)});
break;
} catch (Exception e) {
throw convert("Can not insert new record", e);
@@ -326,7 +326,7 @@
} else {
throw new DataStoreException("Unsupported stream store algorithm: " + storeStream);
}
- conn.executeStmt(updateDataSQL, new Object[]{wrapper, tempId});
+ connectionManager.executeStmt(connection, updateDataSQL, new Object[]{wrapper, tempId});
now = System.currentTimeMillis();
long length = in.getPosition();
DataIdentifier identifier = new DataIdentifier(digest.digest());
@@ -335,7 +335,7 @@
// UPDATE DATASTORE SET ID=?, LENGTH=?, LAST_MODIFIED=?
// WHERE ID=?
// AND NOT EXISTS(SELECT ID FROM DATASTORE WHERE ID=?)
- PreparedStatement prep = conn.executeStmt(updateSQL, new Object[]{
+ PreparedStatement prep = connectionManager.executeStmt(connection, updateSQL, new Object[]{
id, new Long(length), new Long(now),
tempId, id});
int count = prep.getUpdateCount();
@@ -342,9 +342,9 @@
if (count == 0) {
// update count is 0, meaning such a row already exists
// DELETE FROM DATASTORE WHERE ID=?
- conn.executeStmt(deleteSQL, new Object[]{tempId});
+ connectionManager.executeStmt(connection, deleteSQL, new Object[]{tempId});
// SELECT LENGTH, LAST_MODIFIED FROM DATASTORE WHERE ID=?
- prep = conn.executeStmt(selectMetaSQL, new Object[]{id});
+ prep = connectionManager.executeStmt(connection, selectMetaSQL, new Object[]{id});
rs = prep.getResultSet();
if (rs.next()) {
long oldLength = rs.getLong(1);
@@ -362,7 +362,7 @@
}
usesIdentifier(identifier);
DbDataRecord record = new DbDataRecord(this, identifier, length, now);
- conn.setAutoReconnect(true);
+
return record;
} catch (Exception e) {
throw convert("Can not insert new record", e);
@@ -367,8 +367,8 @@
} catch (Exception e) {
throw convert("Can not insert new record", e);
} finally {
- conn.closeSilently(rs);
- putBack(conn);
+ closeSilently(rs);
+ putBack(connection);
if (fileInput != null) {
try {
fileInput.close();
@@ -397,8 +397,9 @@
* {@inheritDoc}
*/
public synchronized int deleteAllOlderThan(long min) throws DataStoreException {
- ConnectionRecoveryManager conn = getConnection();
+ Connection connection = null;
try {
+ connection = getConnection();
Iterator it = new ArrayList(inUse.keySet()).iterator();
while (it.hasNext()) {
DataIdentifier identifier = (DataIdentifier) it.next();
@@ -407,7 +408,7 @@
}
}
// DELETE FROM DATASTORE WHERE LAST_MODIFIED
- PreparedStatement prep = conn.executeStmt(deleteOlderSQL, new Long[]{new Long(min)});
+ PreparedStatement prep = connectionManager.executeStmt(connection, deleteOlderSQL, new Long[]{new Long(min)});
return prep.getUpdateCount();
} catch (Exception e) {
throw convert("Can not delete records", e);
@@ -412,7 +413,7 @@
} catch (Exception e) {
throw convert("Can not delete records", e);
} finally {
- putBack(conn);
+ putBack(connection);
}
}
@@ -420,12 +421,13 @@
* {@inheritDoc}
*/
public Iterator getAllIdentifiers() throws DataStoreException {
- ConnectionRecoveryManager conn = getConnection();
+ Connection connection = null;
ArrayList list = new ArrayList();
ResultSet rs = null;
try {
+ connection = getConnection();
// SELECT ID FROM DATASTORE
- PreparedStatement prep = conn.executeStmt(selectAllSQL, new Object[0]);
+ PreparedStatement prep = connectionManager.executeStmt(connection, selectAllSQL, new Object[0]);
rs = prep.getResultSet();
while (rs.next()) {
String id = rs.getString(1);
@@ -438,8 +440,8 @@
} catch (Exception e) {
throw convert("Can not read records", e);
} finally {
- conn.closeSilently(rs);
- putBack(conn);
+ closeSilently(rs);
+ putBack(connection);
}
}
@@ -464,13 +466,14 @@
* {@inheritDoc}
*/
public DataRecord getRecord(DataIdentifier identifier) throws DataStoreException {
- ConnectionRecoveryManager conn = getConnection();
+ Connection connection = null;
usesIdentifier(identifier);
ResultSet rs = null;
try {
+ connection = getConnection();
String id = identifier.toString();
// SELECT LENGTH, LAST_MODIFIED FROM DATASTORE WHERE ID = ?
- PreparedStatement prep = conn.executeStmt(selectMetaSQL, new Object[]{id});
+ PreparedStatement prep = connectionManager.executeStmt(connection, selectMetaSQL, new Object[]{id});
rs = prep.getResultSet();
if (!rs.next()) {
throw new DataStoreException("Record not found: " + identifier);
@@ -482,8 +485,8 @@
} catch (Exception e) {
throw convert("Can not read identifier " + identifier, e);
} finally {
- conn.closeSilently(rs);
- putBack(conn);
+ closeSilently(rs);
+ putBack(connection);
}
}
@@ -498,12 +501,12 @@
* or if the given identifier is invalid
*/
public DbResources getDatabaseResources(DataIdentifier identifier) throws DataStoreException {
- ConnectionRecoveryManager conn = null;
+ Connection connection = null;
ResultSet rs = null;
try {
- conn = getConnection();
+ connection = getConnection();
// SELECT ID, DATA FROM DATASTORE WHERE ID = ?
- PreparedStatement prep = conn.executeStmt(selectDataSQL, new Object[]{identifier.toString()});
+ PreparedStatement prep = connectionManager.executeStmt(connection, selectDataSQL, new Object[]{identifier.toString()});
rs = prep.getResultSet();
if (!rs.next()) {
throw new DataStoreException("Record not found: " + identifier);
@@ -514,7 +517,7 @@
// If the stream is null, go ahead and close resources
result = new ByteArrayInputStream(new byte[0]);
DatabaseHelper.closeSilently(rs);
- putBack(conn);
+ putBack(connection);
} else {
result = new BufferedInputStream(stream);
if (copyWhenReading) {
@@ -523,11 +526,11 @@
}
}
- DbResources dbResources = new DbResources(conn, rs, prep, result, this);
+ DbResources dbResources = new DbResources(connection, rs, prep, result, this);
return dbResources;
} catch (Exception e) {
- DatabaseHelper.closeSilently(rs);
- putBack(conn);
+ closeSilently(rs);
+ putBack(connection);
throw convert("Retrieving database resources ", e);
}
}
@@ -535,12 +538,20 @@
/**
* {@inheritDoc}
*/
- public synchronized void init(String homeDir) throws DataStoreException {
+ public synchronized void init(DataStoreContext context) throws DataStoreException {
+ Connection connection = null;
try {
initDatabaseType();
- connectionPool = new Pool(this, maxConnections);
- ConnectionRecoveryManager conn = getConnection();
- DatabaseMetaData meta = conn.getConnection().getMetaData();
+ ConnectionProperties cp = new ConnectionProperties();
+ cp.setDriver(getDriver());
+ cp.setUrl(getUrl());
+ cp.setUser(getUser());
+ cp.setPassword(getPassword());
+
+ connectionManager = new ConnectionManager(context.getConnectionProvider(), cp, false);
+
+ connection = getConnection();
+ DatabaseMetaData meta = connection.getMetaData();
log.info("Using JDBC driver " + meta.getDriverName() + " " + meta.getDriverVersion());
meta.getDriverVersion();
ResultSet rs = meta.getTables(null, null, tableSQL, null);
@@ -547,12 +558,13 @@
boolean exists = rs.next();
rs.close();
if (!exists) {
- conn.executeStmt(createTableSQL, null);
+ connectionManager.executeStmt(connection, createTableSQL, null);
}
- putBack(conn);
} catch (Exception e) {
- throw convert("Can not init data store, driver=" + driver + " url=" + url + " user=" + user +
- " tableSQL=" + tableSQL + " createTableSQL=" + createTableSQL, e);
+ throw convert("Can not init data store, driver=" + driver + " url=" + url + " user=" + user
+ + " tableSQL=" + tableSQL + " createTableSQL=" + createTableSQL, e);
+ } finally {
+ putBack(connection);
}
}
@@ -676,10 +688,11 @@
if (lastModified < minModifiedDate) {
long now = System.currentTimeMillis();
Long n = new Long(now);
- ConnectionRecoveryManager conn = getConnection();
+ Connection connection = null;
try {
+ connection = getConnection();
// UPDATE DATASTORE SET LAST_MODIFIED = ? WHERE ID = ? AND LAST_MODIFIED < ?
- conn.executeStmt(updateLastModifiedSQL, new Object[]{
+ connectionManager.executeStmt(connection, updateLastModifiedSQL, new Object[]{
n, identifier.toString(), n
});
return now;
@@ -686,7 +699,7 @@
} catch (Exception e) {
throw convert("Can not update lastModified", e);
} finally {
- putBack(conn);
+ putBack(connection);
}
}
return lastModified;
@@ -789,12 +802,7 @@
* {@inheritDoc}
*/
public synchronized void close() {
- ArrayList list = connectionPool.getAll();
- for (int i = 0; i < list.size(); i++) {
- ConnectionRecoveryManager conn = (ConnectionRecoveryManager) list.get(i);
- conn.close();
- }
- list.clear();
+
}
protected void usesIdentifier(DataIdentifier identifier) {
@@ -816,13 +824,11 @@
}
}
- protected ConnectionRecoveryManager getConnection() throws DataStoreException {
+ protected Connection getConnection() throws DataStoreException {
try {
- ConnectionRecoveryManager conn = (ConnectionRecoveryManager) connectionPool.get();
- conn.setAutoReconnect(true);
- return conn;
- } catch (InterruptedException e) {
- throw new DataStoreException("Interrupted", e);
+ return connectionManager.getConnection();
+ } catch (SQLException e) {
+ throw new DataStoreException("Error getting exception", e);
} catch (RepositoryException e) {
throw new DataStoreException("Can not open a new connection", e);
}
@@ -828,40 +834,18 @@
}
}
- protected void putBack(ConnectionRecoveryManager conn) throws DataStoreException {
- try {
- connectionPool.add(conn);
- } catch (InterruptedException e) {
- throw new DataStoreException("Interrupted", e);
- }
+ protected void putBack(Connection connection) throws DataStoreException {
+ connectionManager.close(connection);
}
- /**
- * Get the maximum number of concurrent connections.
- *
- * @return the maximum number of connections.
- */
- public int getMaxConnections() {
- return maxConnections;
- }
-
- /**
- * Set the maximum number of concurrent connections.
- *
- * @param maxConnections the new value
- */
- public void setMaxConnections(int maxConnections) {
- this.maxConnections = maxConnections;
- }
-
- /**
- * Create a new connection.
- *
- * @return the new connection
- */
- public ConnectionRecoveryManager createNewConnection() throws RepositoryException {
- ConnectionRecoveryManager conn = new ConnectionRecoveryManager(false, driver, url, user, password);
- return conn;
+ protected void closeSilently(ResultSet rs) {
+ try {
+ if (rs != null) {
+ rs.close();
+ }
+ } catch (SQLException e) {
+ log.error("Error closing ResultSet", e);
+ }
}
/**
Index: src/main/java/org/apache/jackrabbit/core/data/db/DbResources.java
===================================================================
--- src/main/java/org/apache/jackrabbit/core/data/db/DbResources.java (revision 676201)
+++ src/main/java/org/apache/jackrabbit/core/data/db/DbResources.java (working copy)
@@ -17,10 +17,10 @@
package org.apache.jackrabbit.core.data.db;
import java.io.InputStream;
+import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
-import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionRecoveryManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,7 +31,7 @@
private static Logger log = LoggerFactory.getLogger(DbResources.class);
- protected final ConnectionRecoveryManager conn;
+ protected final Connection connection;
protected final ResultSet rs;
protected final Statement stmt;
protected final InputStream in;
@@ -38,8 +38,8 @@
protected final DbDataStore store;
protected boolean closed;
- public DbResources(ConnectionRecoveryManager conn, ResultSet rs, Statement stmt, InputStream in, DbDataStore store) {
- this.conn = conn;
+ public DbResources(Connection connection, ResultSet rs, Statement stmt, InputStream in, DbDataStore store) {
+ this.connection = connection;
this.rs = rs;
this.stmt = stmt;
this.in = in;
@@ -47,8 +47,8 @@
this.closed = false;
}
- public ConnectionRecoveryManager getConnection() {
- return conn;
+ public Connection getConnection() {
+ return connection;
}
public InputStream getInputStream() {
@@ -68,7 +68,7 @@
closed = true;
DatabaseHelper.closeSilently(rs);
try {
- store.putBack(conn);
+ store.putBack(connection);
} catch (Exception e) {
log.info("Closing DbResources: ", e);
}
Index: src/main/java/org/apache/jackrabbit/core/data/db/Pool.java
===================================================================
--- src/main/java/org/apache/jackrabbit/core/data/db/Pool.java (revision 676201)
+++ src/main/java/org/apache/jackrabbit/core/data/db/Pool.java (working copy)
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.jackrabbit.core.data.db;
-
-import java.util.ArrayList;
-
-import javax.jcr.RepositoryException;
-
-import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
-
-/**
- * Implementation of a simple ConnectionRecoveryManager pool.
- * The maximum number of pooled objects can be set, and if more objects
- * are requested the pool waits until one object is put back.
- */
-public class Pool {
- protected final int maxSize;
- protected final ArrayList all = new ArrayList();
- protected final DbDataStore factory;
- protected final LinkedQueue pool = new LinkedQueue();
-
- /**
- * Create a new pool using the given factory and maximum pool size.
- *
- * @param factory the db data store
- * @param maxSize the maximum number of objects in the pool.
- */
- protected Pool(DbDataStore factory, int maxSize) {
- this.factory = factory;
- this.maxSize = Math.max(1, maxSize);
- }
-
- /**
- * Get a connection from the pool. This method may open a new connection if
- * required, or if the maximum number of connections are opened, it will
- * wait for one to be returned.
- *
- * @return the connection
- */
- protected Object get() throws InterruptedException, RepositoryException {
- Object o = pool.poll(0);
- if (o == null) {
- synchronized (all) {
- if (all.size() < maxSize) {
- o = factory.createNewConnection();
- all.add(o);
- }
- }
- if (o == null) {
- o = pool.take();
- }
- }
- return o;
- }
-
- /**
- * But a connection back into the pool.
- *
- * @param o the connection
- */
- protected void add(Object o) throws InterruptedException {
- pool.put(o);
- }
-
- /**
- * Get all connections (even if they are currently being used).
- *
- * @return all connections
- */
- protected ArrayList getAll() {
- return all;
- }
-}
Index: src/main/java/org/apache/jackrabbit/core/data/FileDataStore.java
===================================================================
--- src/main/java/org/apache/jackrabbit/core/data/FileDataStore.java (revision 676201)
+++ src/main/java/org/apache/jackrabbit/core/data/FileDataStore.java (working copy)
@@ -115,9 +115,9 @@
*
* @param config the repository configuration
*/
- public void init(String homeDir) {
+ public void init(DataStoreContext context) {
if (path == null) {
- path = homeDir + "/repository/datastore";
+ path = context.getHomeDir() + "/repository/datastore";
}
directory = new File(path);
directory.mkdirs();
Index: src/main/java/org/apache/jackrabbit/core/data/DataStoreContext.java
===================================================================
--- src/main/java/org/apache/jackrabbit/core/data/DataStoreContext.java (revision 0)
+++ src/main/java/org/apache/jackrabbit/core/data/DataStoreContext.java (revision 0)
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.core.data;
+
+import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionProvider;
+
+/**
+ * {@link DataStoreContext} is used to provide contextual information to {@link DataStore}.
+ */
+public class DataStoreContext {
+
+ private final String homeDir;
+ private final ConnectionProvider connectionProvider;
+
+ /**
+ *
+ * @param homeDir the home directory of the repository
+ * @param connectionProvider
+ */
+ public DataStoreContext(String homeDir, ConnectionProvider connectionProvider) {
+ this.homeDir = homeDir;
+ this.connectionProvider = connectionProvider;
+ }
+
+ public String getHomeDir() {
+ return homeDir;
+ }
+
+ public ConnectionProvider getConnectionProvider() {
+ return connectionProvider;
+ }
+}
Index: src/main/java/org/apache/jackrabbit/core/fs/BasedFileSystem.java
===================================================================
--- src/main/java/org/apache/jackrabbit/core/fs/BasedFileSystem.java (revision 676201)
+++ src/main/java/org/apache/jackrabbit/core/fs/BasedFileSystem.java (working copy)
@@ -72,7 +72,7 @@
/**
* {@inheritDoc}
*/
- public void init() throws FileSystemException {
+ public void init(FileSystemContext context) throws FileSystemException {
// check base path
if (!fsBase.isFolder(basePath)) {
fsBase.createFolder(basePath);
Index: src/main/java/org/apache/jackrabbit/core/fs/db/DatabaseFileSystem.java
===================================================================
--- src/main/java/org/apache/jackrabbit/core/fs/db/DatabaseFileSystem.java (revision 676201)
+++ src/main/java/org/apache/jackrabbit/core/fs/db/DatabaseFileSystem.java (working copy)
@@ -16,19 +16,9 @@
*/
package org.apache.jackrabbit.core.fs.db;
-import org.apache.commons.io.IOUtils;
-import org.apache.jackrabbit.core.fs.FileSystem;
-import org.apache.jackrabbit.core.fs.FileSystemException;
-import org.apache.jackrabbit.core.fs.FileSystemPathUtil;
-import org.apache.jackrabbit.core.fs.RandomAccessOutputStream;
-import org.apache.jackrabbit.util.Text;
-import org.apache.jackrabbit.util.TransientFileFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.jcr.RepositoryException;
import java.io.BufferedReader;
import java.io.File;
+import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FilterInputStream;
import java.io.FilterOutputStream;
@@ -36,11 +26,9 @@
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
-import java.io.FileInputStream;
import java.io.RandomAccessFile;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
-import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
@@ -45,8 +33,21 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.HashMap;
+
+import javax.jcr.RepositoryException;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.jackrabbit.core.fs.FileSystem;
+import org.apache.jackrabbit.core.fs.FileSystemContext;
+import org.apache.jackrabbit.core.fs.FileSystemException;
+import org.apache.jackrabbit.core.fs.FileSystemPathUtil;
+import org.apache.jackrabbit.core.fs.RandomAccessOutputStream;
+import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionManager;
+import org.apache.jackrabbit.core.persistence.bundle.util.StreamWrapper;
+import org.apache.jackrabbit.util.Text;
+import org.apache.jackrabbit.util.TransientFileFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Base class for database file systems. This class contains common
@@ -70,6 +71,8 @@
protected boolean initialized;
+ protected ConnectionManager connectionManager;
+
protected String schema;
protected String schemaObjectPrefix;
@@ -76,15 +79,6 @@
// initial size of buffer used to serialize objects
protected static final int INITIAL_BUFFER_SIZE = 8192;
- // jdbc connection
- protected Connection con;
-
- // time to sleep in ms before a reconnect is attempted
- protected static final int SLEEP_BEFORE_RECONNECT = 10000;
-
- // the map of prepared statements (key: sql stmt, value: prepared stmt)
- private HashMap preparedStatements = new HashMap();
-
// SQL statements
protected String selectExistSQL;
protected String selectFileExistSQL;
@@ -174,7 +168,7 @@
/**
* {@inheritDoc}
*/
- public void init() throws FileSystemException {
+ public void init(FileSystemContext context) throws FileSystemException {
if (initialized) {
throw new IllegalStateException("already initialized");
}
@@ -181,7 +175,7 @@
try {
// setup jdbc connection
- initConnection();
+ initConnectionManager(context);
// make sure schemaObjectPrefix consists of legal name characters only
prepareSchemaObjectPrefix();
@@ -192,9 +186,6 @@
// build sql statements
buildSQLStatements();
- // prepare statements
- initPreparedStatements();
-
// finally verify that there's a file system root entry
verifyRootExists();
@@ -214,23 +205,7 @@
throw new IllegalStateException("not initialized");
}
- try {
- // close shared prepared statements
- Iterator it = preparedStatements.values().iterator();
- while (it.hasNext()) {
- closeStatement((PreparedStatement) it.next());
- }
- preparedStatements.clear();
-
- // close jdbc connection
- closeConnection(con);
- } catch (SQLException e) {
- String msg = "error closing file system";
- log.error(msg, e);
- throw new FileSystemException(msg, e);
- } finally {
- initialized = false;
- }
+ initialized = false;
}
/**
@@ -264,16 +239,18 @@
String name = FileSystemPathUtil.getName(filePath);
int count = 0;
- synchronized (deleteFileSQL) {
- try {
- Statement stmt = executeStmt(
- deleteFileSQL, new Object[]{parentDir, name});
- count = stmt.getUpdateCount();
- } catch (SQLException e) {
- String msg = "failed to delete file: " + filePath;
- log.error(msg, e);
- throw new FileSystemException(msg, e);
- }
+ Connection connection = null;
+
+ try {
+ connection = connectionManager.getConnection();
+ Statement stmt = connectionManager.executeStmt(connection, deleteFileSQL, new Object[]{parentDir, name});
+ count = stmt.getUpdateCount();
+ } catch (Exception e) {
+ String msg = "failed to delete file: " + filePath;
+ log.error(msg, e);
+ throw new FileSystemException(msg, e);
+ } finally {
+ connectionManager.close(connection);
}
if (count == 0) {
@@ -299,19 +276,22 @@
String name = FileSystemPathUtil.getName(folderPath);
int count = 0;
- synchronized (deleteFolderSQL) {
- try {
- Statement stmt = executeStmt(deleteFolderSQL, new Object[]{
- parentDir,
- name,
- folderPath,
- folderPath + FileSystem.SEPARATOR + "%"});
- count = stmt.getUpdateCount();
- } catch (SQLException e) {
- String msg = "failed to delete folder: " + folderPath;
- log.error(msg, e);
- throw new FileSystemException(msg, e);
- }
+ Connection connection = null;
+
+ try {
+ connection = connectionManager.getConnection();
+ Statement stmt = connectionManager.executeStmt(connection, deleteFolderSQL, new Object[]{
+ parentDir,
+ name,
+ folderPath,
+ folderPath + FileSystem.SEPARATOR + "%"});
+ count = stmt.getUpdateCount();
+ } catch (Exception e) {
+ String msg = "failed to delete folder: " + folderPath;
+ log.error(msg, e);
+ throw new FileSystemException(msg, e);
+ } finally {
+ connectionManager.close(connection);
}
if (count == 0) {
@@ -332,23 +312,25 @@
String parentDir = FileSystemPathUtil.getParentDir(path);
String name = FileSystemPathUtil.getName(path);
- synchronized (selectExistSQL) {
- ResultSet rs = null;
- try {
- Statement stmt = executeStmt(
- selectExistSQL, new Object[]{parentDir, name});
- rs = stmt.getResultSet();
+ Connection connection = null;
+ ResultSet rs = null;
+
+ try {
+ connection = connectionManager.getConnection();
+ Statement stmt = connectionManager.executeStmt(
+ connection, selectExistSQL, new Object[]{parentDir, name});
+ rs = stmt.getResultSet();
- // a file system entry exists if the result set
- // has at least one entry
- return rs.next();
- } catch (SQLException e) {
- String msg = "failed to check existence of file system entry: " + path;
- log.error(msg, e);
- throw new FileSystemException(msg, e);
- } finally {
- closeResultSet(rs);
- }
+ // a file system entry exists if the result set
+ // has at least one entry
+ return rs.next();
+ } catch (Exception e) {
+ String msg = "failed to check existence of file system entry: " + path;
+ log.error(msg, e);
+ throw new FileSystemException(msg, e);
+ } finally {
+ closeResultSet(rs);
+ connectionManager.close(connection);
}
}
@@ -365,22 +347,25 @@
String parentDir = FileSystemPathUtil.getParentDir(path);
String name = FileSystemPathUtil.getName(path);
- synchronized (selectFileExistSQL) {
- ResultSet rs = null;
- try {
- Statement stmt = executeStmt(
- selectFileExistSQL, new Object[]{parentDir, name});
- rs = stmt.getResultSet();
+ Connection connection = null;
+ ResultSet rs = null;
+
+ try {
+ connection = connectionManager.getConnection();
- // a file exists if the result set has at least one entry
- return rs.next();
- } catch (SQLException e) {
- String msg = "failed to check existence of file: " + path;
- log.error(msg, e);
- throw new FileSystemException(msg, e);
- } finally {
- closeResultSet(rs);
- }
+ Statement stmt = connectionManager.executeStmt(
+ connection, selectFileExistSQL, new Object[]{parentDir, name});
+ rs = stmt.getResultSet();
+
+ // a file exists if the result set has at least one entry
+ return rs.next();
+ } catch (Exception e) {
+ String msg = "failed to check existence of file: " + path;
+ log.error(msg, e);
+ throw new FileSystemException(msg, e);
+ } finally {
+ closeResultSet(rs);
+ connectionManager.close(connection);
}
}
@@ -397,22 +382,24 @@
String parentDir = FileSystemPathUtil.getParentDir(path);
String name = FileSystemPathUtil.getName(path);
- synchronized (selectFolderExistSQL) {
- ResultSet rs = null;
- try {
- Statement stmt = executeStmt(
- selectFolderExistSQL, new Object[]{parentDir, name});
- rs = stmt.getResultSet();
+ Connection connection = null;
+ ResultSet rs = null;
- // a folder exists if the result set has at least one entry
- return rs.next();
- } catch (SQLException e) {
- String msg = "failed to check existence of folder: " + path;
- log.error(msg, e);
- throw new FileSystemException(msg, e);
- } finally {
- closeResultSet(rs);
- }
+ try {
+ connection = connectionManager.getConnection();
+ Statement stmt = connectionManager.executeStmt(
+ connection, selectFolderExistSQL, new Object[]{parentDir, name});
+ rs = stmt.getResultSet();
+
+ // a folder exists if the result set has at least one entry
+ return rs.next();
+ } catch (Exception e) {
+ String msg = "failed to check existence of folder: " + path;
+ log.error(msg, e);
+ throw new FileSystemException(msg, e);
+ } finally {
+ closeResultSet(rs);
+ connectionManager.close(connection);
}
}
@@ -429,23 +416,25 @@
String parentDir = FileSystemPathUtil.getParentDir(path);
String name = FileSystemPathUtil.getName(path);
- synchronized (selectLastModifiedSQL) {
- ResultSet rs = null;
- try {
- Statement stmt = executeStmt(
- selectLastModifiedSQL, new Object[]{parentDir, name});
- rs = stmt.getResultSet();
- if (!rs.next()) {
- throw new FileSystemException("no such file system entry: " + path);
- }
- return rs.getLong(1);
- } catch (SQLException e) {
- String msg = "failed to determine lastModified of file system entry: " + path;
- log.error(msg, e);
- throw new FileSystemException(msg, e);
- } finally {
- closeResultSet(rs);
+ Connection connection = null;
+ ResultSet rs = null;
+
+ try {
+ connection = connectionManager.getConnection();
+ Statement stmt = connectionManager.executeStmt(
+ connection, selectLastModifiedSQL, new Object[]{parentDir, name});
+ rs = stmt.getResultSet();
+ if (!rs.next()) {
+ throw new FileSystemException("no such file system entry: " + path);
}
+ return rs.getLong(1);
+ } catch (Exception e) {
+ String msg = "failed to determine lastModified of file system entry: " + path;
+ log.error(msg, e);
+ throw new FileSystemException(msg, e);
+ } finally {
+ closeResultSet(rs);
+ connectionManager.close(connection);
}
}
@@ -462,23 +451,26 @@
String parentDir = FileSystemPathUtil.getParentDir(filePath);
String name = FileSystemPathUtil.getName(filePath);
- synchronized (selectLengthSQL) {
- ResultSet rs = null;
- try {
- Statement stmt = executeStmt(
- selectLengthSQL, new Object[]{parentDir, name});
- rs = stmt.getResultSet();
- if (!rs.next()) {
- throw new FileSystemException("no such file: " + filePath);
- }
- return rs.getLong(1);
- } catch (SQLException e) {
- String msg = "failed to determine length of file: " + filePath;
- log.error(msg, e);
- throw new FileSystemException(msg, e);
- } finally {
- closeResultSet(rs);
+ Connection connection = null;
+ ResultSet rs = null;
+
+ try {
+ connection = connectionManager.getConnection();
+
+ Statement stmt = connectionManager.executeStmt(
+ connection, selectLengthSQL, new Object[]{parentDir, name});
+ rs = stmt.getResultSet();
+ if (!rs.next()) {
+ throw new FileSystemException("no such file: " + filePath);
}
+ return rs.getLong(1);
+ } catch (Exception e) {
+ String msg = "failed to determine length of file: " + filePath;
+ log.error(msg, e);
+ throw new FileSystemException(msg, e);
+ } finally {
+ closeResultSet(rs);
+ connectionManager.close(connection);
}
}
@@ -496,27 +488,31 @@
throw new FileSystemException("no such file system entry: " + path);
}
- synchronized (selectChildCountSQL) {
- ResultSet rs = null;
- try {
- Statement stmt = executeStmt(selectChildCountSQL, new Object[]{path});
- rs = stmt.getResultSet();
- if (!rs.next()) {
- return false;
- }
- int count = rs.getInt(1);
- if (FileSystemPathUtil.denotesRoot(path)) {
- // ingore file system root entry
- count--;
- }
- return (count > 0);
- } catch (SQLException e) {
- String msg = "failed to determine child count of file system entry: " + path;
- log.error(msg, e);
- throw new FileSystemException(msg, e);
- } finally {
- closeResultSet(rs);
+ Connection connection = null;
+ ResultSet rs = null;
+
+ try {
+ connection = connectionManager.getConnection();
+ Statement stmt = connectionManager.executeStmt(
+ connection, selectChildCountSQL, new Object[]{path});
+
+ rs = stmt.getResultSet();
+ if (!rs.next()) {
+ return false;
+ }
+ int count = rs.getInt(1);
+ if (FileSystemPathUtil.denotesRoot(path)) {
+ // ingore file system root entry
+ count--;
}
+ return (count > 0);
+ } catch (Exception e) {
+ String msg = "failed to determine child count of file system entry: " + path;
+ log.error(msg, e);
+ throw new FileSystemException(msg, e);
+ } finally {
+ closeResultSet(rs);
+ connectionManager.close(connection);
}
}
@@ -534,30 +530,33 @@
throw new FileSystemException("no such folder: " + folderPath);
}
- synchronized (selectFileAndFolderNamesSQL) {
- ResultSet rs = null;
- try {
- Statement stmt = executeStmt(
- selectFileAndFolderNamesSQL, new Object[]{folderPath});
- rs = stmt.getResultSet();
- ArrayList names = new ArrayList();
- while (rs.next()) {
- String name = rs.getString(1);
- if (name.length() == 0
- && FileSystemPathUtil.denotesRoot(folderPath)) {
- // this is the file system root entry, skip...
- continue;
- }
- names.add(name);
+ Connection connection = null;
+ ResultSet rs = null;
+
+ try {
+ connection = connectionManager.getConnection();
+
+ Statement stmt = connectionManager.executeStmt(
+ connection, selectFileAndFolderNamesSQL, new Object[]{folderPath});
+ rs = stmt.getResultSet();
+ ArrayList names = new ArrayList();
+ while (rs.next()) {
+ String name = rs.getString(1);
+ if (name.length() == 0
+ && FileSystemPathUtil.denotesRoot(folderPath)) {
+ // this is the file system root entry, skip...
+ continue;
}
- return (String[]) names.toArray(new String[names.size()]);
- } catch (SQLException e) {
- String msg = "failed to list child entries of folder: " + folderPath;
- log.error(msg, e);
- throw new FileSystemException(msg, e);
- } finally {
- closeResultSet(rs);
+ names.add(name);
}
+ return (String[]) names.toArray(new String[names.size()]);
+ } catch (Exception e) {
+ String msg = "failed to list child entries of folder: " + folderPath;
+ log.error(msg, e);
+ throw new FileSystemException(msg, e);
+ } finally {
+ closeResultSet(rs);
+ connectionManager.close(connection);
}
}
@@ -575,24 +574,27 @@
throw new FileSystemException("no such folder: " + folderPath);
}
- synchronized (selectFileNamesSQL) {
- ResultSet rs = null;
- try {
- Statement stmt = executeStmt(
- selectFileNamesSQL, new Object[]{folderPath});
- rs = stmt.getResultSet();
- ArrayList names = new ArrayList();
- while (rs.next()) {
- names.add(rs.getString(1));
- }
- return (String[]) names.toArray(new String[names.size()]);
- } catch (SQLException e) {
- String msg = "failed to list file entries of folder: " + folderPath;
- log.error(msg, e);
- throw new FileSystemException(msg, e);
- } finally {
- closeResultSet(rs);
+ Connection connection = null;
+ ResultSet rs = null;
+
+ try {
+ connection = connectionManager.getConnection();
+
+ Statement stmt = connectionManager.executeStmt(
+ connection, selectFileNamesSQL, new Object[]{folderPath});
+ rs = stmt.getResultSet();
+ ArrayList names = new ArrayList();
+ while (rs.next()) {
+ names.add(rs.getString(1));
}
+ return (String[]) names.toArray(new String[names.size()]);
+ } catch (Exception e) {
+ String msg = "failed to list file entries of folder: " + folderPath;
+ log.error(msg, e);
+ throw new FileSystemException(msg, e);
+ } finally {
+ closeResultSet(rs);
+ connectionManager.close(connection);
}
}
@@ -610,30 +612,33 @@
throw new FileSystemException("no such folder: " + folderPath);
}
- synchronized (selectFolderNamesSQL) {
- ResultSet rs = null;
- try {
- Statement stmt = executeStmt(
- selectFolderNamesSQL, new Object[]{folderPath});
- rs = stmt.getResultSet();
- ArrayList names = new ArrayList();
- while (rs.next()) {
- String name = rs.getString(1);
- if (name.length() == 0
- && FileSystemPathUtil.denotesRoot(folderPath)) {
- // this is the file system root entry, skip...
- continue;
- }
- names.add(name);
+ Connection connection = null;
+ ResultSet rs = null;
+
+ try {
+ connection = connectionManager.getConnection();
+
+ Statement stmt = connectionManager.executeStmt(
+ connection, selectFolderNamesSQL, new Object[]{folderPath});
+ rs = stmt.getResultSet();
+ ArrayList names = new ArrayList();
+ while (rs.next()) {
+ String name = rs.getString(1);
+ if (name.length() == 0
+ && FileSystemPathUtil.denotesRoot(folderPath)) {
+ // this is the file system root entry, skip...
+ continue;
}
- return (String[]) names.toArray(new String[names.size()]);
- } catch (SQLException e) {
- String msg = "failed to list folder entries of folder: " + folderPath;
- log.error(msg, e);
- throw new FileSystemException(msg, e);
- } finally {
- closeResultSet(rs);
+ names.add(name);
}
+ return (String[]) names.toArray(new String[names.size()]);
+ } catch (Exception e) {
+ String msg = "failed to list folder entries of folder: " + folderPath;
+ log.error(msg, e);
+ throw new FileSystemException(msg, e);
+ } finally {
+ closeResultSet(rs);
+ connectionManager.close(connection);
}
}
@@ -651,18 +656,21 @@
String name = FileSystemPathUtil.getName(filePath);
int count = 0;
- synchronized (updateLastModifiedSQL) {
- try {
- Statement stmt = executeStmt(updateLastModifiedSQL, new Object[]{
- new Long(System.currentTimeMillis()),
- parentDir,
- name});
- count = stmt.getUpdateCount();
- } catch (SQLException e) {
- String msg = "failed to touch file: " + filePath;
- log.error(msg, e);
- throw new FileSystemException(msg, e);
- }
+ Connection connection = null;
+
+ try {
+ connection = connectionManager.getConnection();
+ Statement stmt = connectionManager.executeStmt(connection, updateLastModifiedSQL, new Object[]{
+ new Long(System.currentTimeMillis()),
+ parentDir,
+ name});
+ count = stmt.getUpdateCount();
+ } catch (Exception e) {
+ String msg = "failed to touch file: " + filePath;
+ log.error(msg, e);
+ throw new FileSystemException(msg, e);
+ } finally {
+ connectionManager.close(connection);
}
if (count == 0) {
@@ -683,32 +691,47 @@
String parentDir = FileSystemPathUtil.getParentDir(filePath);
String name = FileSystemPathUtil.getName(filePath);
- synchronized (selectDataSQL) {
- try {
- Statement stmt = executeStmt(
- selectDataSQL, new Object[]{parentDir, name});
+ Connection connection = null;
+ ResultSet rs = null;
+
+ try {
+ connection = connectionManager.getConnection();
+
+ Statement stmt = connectionManager.executeStmt(
+ connection, selectDataSQL, new Object[]{parentDir, name});
- final ResultSet rs = stmt.getResultSet();
- if (!rs.next()) {
- throw new FileSystemException("no such file: " + filePath);
- }
- InputStream in = rs.getBinaryStream(1);
- /**
- * return an InputStream wrapper in order to
- * close the ResultSet when the stream is closed
- */
- return new FilterInputStream(in) {
- public void close() throws IOException {
- super.close();
- // close ResultSet
- closeResultSet(rs);
- }
- };
- } catch (SQLException e) {
- String msg = "failed to retrieve data of file: " + filePath;
- log.error(msg, e);
- throw new FileSystemException(msg, e);
+ rs = stmt.getResultSet();
+ if (!rs.next()) {
+ throw new FileSystemException("no such file: " + filePath);
}
+ InputStream in = rs.getBinaryStream(1);
+
+ // prevent closing connection and resultset in the finally block
+ final Connection c = connection;
+ final ResultSet r = rs;
+ connection = null;
+ rs = null;
+
+ /**
+ * return an InputStream wrapper in order to
+ * close the ResultSet when the stream is closed
+ */
+ return new FilterInputStream(in) {
+ public void close() throws IOException {
+ // close ResultSet
+ closeResultSet(r);
+ connectionManager.close(c);
+
+ super.close();
+ }
+ };
+ } catch (Exception e) {
+ String msg = "failed to retrieve data of file: " + filePath;
+ log.error(msg, e);
+ throw new FileSystemException(msg, e);
+ } finally {
+ closeResultSet(rs);
+ connectionManager.close(connection);
}
}
@@ -744,33 +767,34 @@
super.close();
InputStream in = null;
+ Connection connection = null;
try {
+ connection = connectionManager.getConnection();
+
if (isFile(filePath)) {
- synchronized (updateDataSQL) {
- long length = tmpFile.length();
- in = new FileInputStream(tmpFile);
- executeStmt(updateDataSQL,
- new Object[]{
- new SizedInputStream(in, length),
- new Long(System.currentTimeMillis()),
- new Long(length),
- parentDir,
- name
- });
- }
+ long length = tmpFile.length();
+ in = new FileInputStream(tmpFile);
+ connectionManager.executeStmt(connection, updateDataSQL,
+ new Object[]{
+ new StreamWrapper(in, length),
+ new Long(System.currentTimeMillis()),
+ new Long(length),
+ parentDir,
+ name
+ });
+
} else {
- synchronized (insertFileSQL) {
- long length = tmpFile.length();
- in = new FileInputStream(tmpFile);
- executeStmt(insertFileSQL,
- new Object[]{
- parentDir,
- name,
- new SizedInputStream(in, length),
- new Long(System.currentTimeMillis()),
- new Long(length)
- });
- }
+ long length = tmpFile.length();
+ in = new FileInputStream(tmpFile);
+ connectionManager.executeStmt(connection, insertFileSQL,
+ new Object[]{
+ parentDir,
+ name,
+ new StreamWrapper(in, length),
+ new Long(System.currentTimeMillis()),
+ new Long(length)
+ });
+
}
} catch (Exception e) {
@@ -778,6 +802,8 @@
ioe.initCause(e);
throw ioe;
} finally {
+ connectionManager.close(connection);
+
if (in != null) {
in.close();
}
@@ -841,33 +867,32 @@
raf.close();
InputStream in = null;
+ Connection connection = null;
try {
+ connection = connectionManager.getConnection();
+
if (isFile(filePath)) {
- synchronized (updateDataSQL) {
- long length = tmpFile.length();
- in = new FileInputStream(tmpFile);
- executeStmt(updateDataSQL,
- new Object[]{
- new SizedInputStream(in, length),
- new Long(System.currentTimeMillis()),
- new Long(length),
- parentDir,
- name
- });
- }
+ long length = tmpFile.length();
+ in = new FileInputStream(tmpFile);
+ connectionManager.executeStmt(connection, updateDataSQL,
+ new Object[]{
+ new StreamWrapper(in, length),
+ new Long(System.currentTimeMillis()),
+ new Long(length),
+ parentDir,
+ name
+ });
} else {
- synchronized (insertFileSQL) {
- long length = tmpFile.length();
- in = new FileInputStream(tmpFile);
- executeStmt(insertFileSQL,
- new Object[]{
- parentDir,
- name,
- new SizedInputStream(in, length),
- new Long(System.currentTimeMillis()),
- new Long(length)
- });
- }
+ long length = tmpFile.length();
+ in = new FileInputStream(tmpFile);
+ connectionManager.executeStmt(connection, insertFileSQL,
+ new Object[]{
+ parentDir,
+ name,
+ new StreamWrapper(in, length),
+ new Long(System.currentTimeMillis()),
+ new Long(length)
+ });
}
} catch (Exception e) {
@@ -875,6 +900,8 @@
ioe.initCause(e);
throw ioe;
} finally {
+ connectionManager.close(connection);
+
if (in != null) {
in.close();
}
@@ -952,159 +979,15 @@
//----------------------------------< misc. helper methods & overridables >
- /**
- * Initializes the database connection used by this file system.
- *
- * Subclasses should normally override the {@link #getConnection()}
- * method instead of this one. The default implementation calls
- * {@link #getConnection()} to get the database connection and disables
- * the autocommit feature.
- *
- * @throws Exception if an error occurs
- */
- protected void initConnection() throws Exception {
- con = getConnection();
- // JCR-1013: Setter may fail unnecessarily on a managed connection
- if (!con.getAutoCommit()) {
- con.setAutoCommit(true);
- }
- }
- /**
- * Abstract factory method for creating a new database connection. This
- * method is called by {@link #initConnection()} when the file system is
- * started. The returned connection should come with the default JDBC
- * settings, as the {@link #initConnection()} method will explicitly set
- * the autoCommit and other properties as needed.
- *
- * Note that the returned database connection is kept during the entire - * lifetime of the file system, after which it is closed by - * {@link #close()} using the {@link #closeConnection(Connection)} method. - * - * @return new connection - * @throws Exception if an error occurs - */ - protected Connection getConnection() throws Exception { + protected ConnectionManager getConnectionManager(FileSystemContext context) throws Exception { throw new UnsupportedOperationException("Override in a subclass!"); } - /** - * Closes the given database connection. This method is called by - * {@link #close()} to close the connection acquired using - * {@link #getConnection()} when the file system was started. - *
- * The default implementation just calls the {@link Connection#close()}
- * method of the given connection, but subclasses can override this
- * method to provide more extensive database and connection cleanup.
- *
- * @param connection database connection
- * @throws SQLException if an error occurs
- */
- protected void closeConnection(Connection connection) throws SQLException {
- connection.close();
- }
-
- /**
- * Re-establishes the database connection. This method is called by
- * {@link #executeStmt(String, Object[])} after a SQLException
- * had been encountered.
- *
- * @return true if the connection could be successfully re-established,
- * false otherwise.
- */
- protected synchronized boolean reestablishConnection() {
- // in any case try to shut down current connection
- // gracefully in order to avoid potential memory leaks
-
- // close shared prepared statements
- Iterator it = preparedStatements.values().iterator();
- while (it.hasNext()) {
- closeStatement((PreparedStatement) it.next());
- }
- try {
- closeConnection(con);
- } catch (Exception ignore) {
- }
-
- // sleep for a while to give database a chance
- // to restart before a reconnect is attempted
-
- try {
- Thread.sleep(SLEEP_BEFORE_RECONNECT);
- } catch (InterruptedException ignore) {
- }
-
- // now try to re-establish connection
-
- try {
- initConnection();
- initPreparedStatements();
- return true;
- } catch (Exception e) {
- log.error("failed to re-establish connection", e);
- // reconnect failed
- return false;
- }
- }
-
- /**
- * Executes the given SQL statement with the specified parameters.
- * If a SQLException is encountered one attempt is made
- * to re-establish the database connection and re-execute the statement.
- *
- * @param sql statement to execute
- * @param params parameters to set
- * @return the Statement object that had been executed
- * @throws SQLException if an error occurs
- */
- protected Statement executeStmt(String sql, Object[] params)
- throws SQLException {
- int trials = 2;
- while (true) {
- PreparedStatement stmt = (PreparedStatement) preparedStatements.get(sql);
- try {
- for (int i = 0; i < params.length; i++) {
- if (params[i] instanceof SizedInputStream) {
- SizedInputStream in = (SizedInputStream) params[i];
- stmt.setBinaryStream(i + 1, in, (int) in.getSize());
- } else {
- stmt.setObject(i + 1, params[i]);
- }
- }
- stmt.execute();
- resetStatement(stmt);
- return stmt;
- } catch (SQLException se) {
- if (--trials == 0) {
- // no more trials, re-throw
- throw se;
- }
- log.warn("execute failed, about to reconnect...", se.getMessage());
-
- // try to reconnect
- if (reestablishConnection()) {
- // reconnect succeeded; check whether it's possible to
- // re-execute the prepared stmt with the given parameters
- for (int i = 0; i < params.length; i++) {
- if (params[i] instanceof SizedInputStream) {
- SizedInputStream in = (SizedInputStream) params[i];
- if (in.isConsumed()) {
- // we're unable to re-execute the prepared stmt
- // since an InputStream paramater has already
- // been 'consumed';
- // re-throw previous SQLException
- throw se;
- }
- }
- }
-
- // try again to execute the statement
- continue;
- } else {
- // reconnect failed, re-throw previous SQLException
- throw se;
- }
- }
+ protected void initConnectionManager(FileSystemContext context) throws Exception {
+ connectionManager = getConnectionManager(context);
+ if (connectionManager == null) {
+ throw new FileSystemException("Failed to init ConnectionManager. ConnectionManager may not be null.");
}
}
@@ -1116,25 +999,31 @@
* @throws Exception if an error occurs
*/
protected void prepareSchemaObjectPrefix() throws Exception {
- DatabaseMetaData metaData = con.getMetaData();
- String legalChars = metaData.getExtraNameCharacters();
- legalChars += "ABCDEFGHIJKLMNOPQRSTUVWXZY0123456789_";
+ Connection connection = null;
+ try {
+ connection = connectionManager.getConnection();
+ DatabaseMetaData metaData = connection.getMetaData();
+ String legalChars = metaData.getExtraNameCharacters();
+ legalChars += "ABCDEFGHIJKLMNOPQRSTUVWXZY0123456789_";
- String prefix = schemaObjectPrefix.toUpperCase();
- StringBuffer escaped = new StringBuffer();
- for (int i = 0; i < prefix.length(); i++) {
- char c = prefix.charAt(i);
- if (legalChars.indexOf(c) == -1) {
- escaped.append("_x");
- String hex = Integer.toHexString(c);
- escaped.append("0000".toCharArray(), 0, 4 - hex.length());
- escaped.append(hex);
- escaped.append("_");
- } else {
- escaped.append(c);
+ String prefix = schemaObjectPrefix.toUpperCase();
+ StringBuffer escaped = new StringBuffer();
+ for (int i = 0; i < prefix.length(); i++) {
+ char c = prefix.charAt(i);
+ if (legalChars.indexOf(c) == -1) {
+ escaped.append("_x");
+ String hex = Integer.toHexString(c);
+ escaped.append("0000".toCharArray(), 0, 4 - hex.length());
+ escaped.append(hex);
+ escaped.append("_");
+ } else {
+ escaped.append(c);
+ }
}
+ schemaObjectPrefix = escaped.toString();
+ } finally {
+ connectionManager.close(connection);
}
- schemaObjectPrefix = escaped.toString();
}
/**
@@ -1144,48 +1033,54 @@
* @throws Exception if an error occurs
*/
protected void checkSchema() throws Exception {
- DatabaseMetaData metaData = con.getMetaData();
- String tableName = schemaObjectPrefix + "FSENTRY";
- if (metaData.storesLowerCaseIdentifiers()) {
- tableName = tableName.toLowerCase();
- } else if (metaData.storesUpperCaseIdentifiers()) {
- tableName = tableName.toUpperCase();
- }
- ResultSet rs = metaData.getTables(null, null, tableName, null);
- boolean schemaExists;
+ Connection connection = null;
try {
- schemaExists = rs.next();
- } finally {
- rs.close();
- }
-
- if (!schemaExists) {
- // read ddl from resources
- InputStream in = DatabaseFileSystem.class.getResourceAsStream(schema + ".ddl");
- if (in == null) {
- String msg = "Configuration error: unknown schema '" + schema + "'";
- log.debug(msg);
- throw new RepositoryException(msg);
+ connection = connectionManager.getConnection();
+ DatabaseMetaData metaData = connection.getMetaData();
+ String tableName = schemaObjectPrefix + "FSENTRY";
+ if (metaData.storesLowerCaseIdentifiers()) {
+ tableName = tableName.toLowerCase();
+ } else if (metaData.storesUpperCaseIdentifiers()) {
+ tableName = tableName.toUpperCase();
}
- BufferedReader reader = new BufferedReader(new InputStreamReader(in));
- Statement stmt = con.createStatement();
+ ResultSet rs = metaData.getTables(null, null, tableName, null);
+ boolean schemaExists;
try {
- String sql = reader.readLine();
- while (sql != null) {
- // Skip comments and empty lines
- if (!sql.startsWith("#") && sql.length() > 0) {
- // replace prefix variable
- sql = Text.replace(sql, SCHEMA_OBJECT_PREFIX_VARIABLE, schemaObjectPrefix);
- // execute sql stmt
- stmt.executeUpdate(sql);
+ schemaExists = rs.next();
+ } finally {
+ rs.close();
+ }
+
+ if (!schemaExists) {
+ // read ddl from resources
+ InputStream in = DatabaseFileSystem.class.getResourceAsStream(schema + ".ddl");
+ if (in == null) {
+ String msg = "Configuration error: unknown schema '" + schema + "'";
+ log.debug(msg);
+ throw new RepositoryException(msg);
+ }
+ BufferedReader reader = new BufferedReader(new InputStreamReader(in));
+ Statement stmt = connection.createStatement();
+ try {
+ String sql = reader.readLine();
+ while (sql != null) {
+ // Skip comments and empty lines
+ if (!sql.startsWith("#") && sql.length() > 0) {
+ // replace prefix variable
+ sql = Text.replace(sql, SCHEMA_OBJECT_PREFIX_VARIABLE, schemaObjectPrefix);
+ // execute sql stmt
+ stmt.executeUpdate(sql);
+ }
+ // read next sql stmt
+ sql = reader.readLine();
}
- // read next sql stmt
- sql = reader.readLine();
+ } finally {
+ IOUtils.closeQuietly(in);
+ closeStatement(stmt);
}
- } finally {
- IOUtils.closeQuietly(in);
- closeStatement(stmt);
}
+ } finally {
+ connectionManager.close(connection);
}
}
@@ -1283,51 +1178,6 @@
}
/**
- * Initializes the map of prepared statements.
- *
- * @throws SQLException if an error occurs
- */
- protected void initPreparedStatements() throws SQLException {
- preparedStatements.put(
- selectExistSQL, con.prepareStatement(selectExistSQL));
- preparedStatements.put(
- selectFileExistSQL, con.prepareStatement(selectFileExistSQL));
- preparedStatements.put(
- selectFolderExistSQL, con.prepareStatement(selectFolderExistSQL));
- preparedStatements.put(
- selectChildCountSQL, con.prepareStatement(selectChildCountSQL));
- preparedStatements.put(
- selectDataSQL, con.prepareStatement(selectDataSQL));
- preparedStatements.put(
- selectLastModifiedSQL, con.prepareStatement(selectLastModifiedSQL));
- preparedStatements.put(
- selectLengthSQL, con.prepareStatement(selectLengthSQL));
- preparedStatements.put(
- selectFileNamesSQL, con.prepareStatement(selectFileNamesSQL));
- preparedStatements.put(
- selectFolderNamesSQL, con.prepareStatement(selectFolderNamesSQL));
- preparedStatements.put(
- selectFileAndFolderNamesSQL, con.prepareStatement(selectFileAndFolderNamesSQL));
- preparedStatements.put(
- deleteFileSQL, con.prepareStatement(deleteFileSQL));
- preparedStatements.put(
- deleteFolderSQL, con.prepareStatement(deleteFolderSQL));
- preparedStatements.put(
- insertFileSQL, con.prepareStatement(insertFileSQL));
- preparedStatements.put(
- insertFolderSQL, con.prepareStatement(insertFolderSQL));
- preparedStatements.put(
- updateDataSQL, con.prepareStatement(updateDataSQL));
- preparedStatements.put(
- updateLastModifiedSQL, con.prepareStatement(updateLastModifiedSQL));
- preparedStatements.put(
- copyFileSQL, con.prepareStatement(copyFileSQL));
- preparedStatements.put(
- copyFilesSQL, con.prepareStatement(copyFilesSQL));
-
- }
-
- /**
* Verifies that the root file system entry exists. If it doesn't exist yet
* it will be automatically created.
*
@@ -1335,27 +1185,29 @@
*/
protected void verifyRootExists() throws Exception {
// check if root file system entry exists
- synchronized (selectFolderExistSQL) {
- ResultSet rs = null;
- try {
- Statement stmt = executeStmt(
- selectFolderExistSQL,
- new Object[]{FileSystem.SEPARATOR, ""});
- rs = stmt.getResultSet();
+ Connection connection = null;
+ ResultSet rs = null;
- if (rs.next()) {
- // root entry exists
- return;
- }
- } catch (SQLException e) {
- String msg = "failed to check existence of file system root entry";
- log.error(msg, e);
- throw new FileSystemException(msg, e);
- } finally {
- closeResultSet(rs);
+ try {
+ connection = connectionManager.getConnection();
+ Statement stmt = connectionManager.executeStmt(connection, selectFolderExistSQL,
+ new Object[]{FileSystem.SEPARATOR, ""});
+ rs = stmt.getResultSet();
+
+ if (rs.next()) {
+ // root entry exists
+ return;
}
+ } catch (Exception e) {
+ String msg = "failed to check existence of file system root entry";
+ log.error(msg, e);
+ throw new FileSystemException(msg, e);
+ } finally {
+ closeResultSet(rs);
+ connectionManager.close(connection);
}
+
// the root entry doesn't exist yet, create it...
createDeepFolder(FileSystem.SEPARATOR);
}
@@ -1378,19 +1230,21 @@
}
}
- synchronized (insertFolderSQL) {
- try {
- executeStmt(
- insertFolderSQL,
- new Object[]{
- parentDir,
- name,
- new Long(System.currentTimeMillis())});
- } catch (SQLException e) {
- String msg = "failed to create folder entry: " + folderPath;
- log.error(msg, e);
- throw new FileSystemException(msg, e);
- }
+ Connection connection = null;
+
+ try {
+ connection = connectionManager.getConnection();
+ connectionManager.executeStmt(connection, insertFolderSQL,
+ new Object[]{
+ parentDir,
+ name,
+ new Long(System.currentTimeMillis())});
+ } catch (Exception e) {
+ String msg = "failed to create folder entry: " + folderPath;
+ log.error(msg, e);
+ throw new FileSystemException(msg, e);
+ } finally {
+ connectionManager.close(connection);
}
}
@@ -1418,14 +1272,16 @@
copyDeepFolder(src, dest);
}
- synchronized (copyFilesSQL) {
- try {
- executeStmt(copyFilesSQL, new Object[]{destPath, srcPath});
- } catch (SQLException e) {
- String msg = "failed to copy file entries from " + srcPath + " to " + destPath;
- log.error(msg, e);
- throw new FileSystemException(msg, e);
- }
+ Connection connection = null;
+ try {
+ connection = connectionManager.getConnection();
+ connectionManager.executeStmt(connection, copyFilesSQL, new Object[]{destPath, srcPath});
+ } catch (Exception e) {
+ String msg = "failed to copy file entries from " + srcPath + " to " + destPath;
+ log.error(msg, e);
+ throw new FileSystemException(msg, e);
+ } finally {
+ connectionManager.close(connection);
}
}
@@ -1456,21 +1312,24 @@
}
int count = 0;
- synchronized (copyFileSQL) {
- try {
- Statement stmt = executeStmt(
- copyFileSQL,
- new Object[]{
- destParentDir,
- destName,
- srcParentDir,
- srcName});
- count = stmt.getUpdateCount();
- } catch (SQLException e) {
- String msg = "failed to copy file from " + srcPath + " to " + destPath;
- log.error(msg, e);
- throw new FileSystemException(msg, e);
- }
+ Connection connection = null;
+
+ try {
+ connection = connectionManager.getConnection();
+ Statement stmt = connectionManager.executeStmt(
+ connection, copyFileSQL,
+ new Object[]{
+ destParentDir,
+ destName,
+ srcParentDir,
+ srcName});
+ count = stmt.getUpdateCount();
+ } catch (Exception e) {
+ String msg = "failed to copy file from " + srcPath + " to " + destPath;
+ log.error(msg, e);
+ throw new FileSystemException(msg, e);
+ } finally {
+ connectionManager.close(connection);
}
if (count == 0) {
@@ -1478,28 +1337,6 @@
}
}
- /**
- * Resets the given PreparedStatement by clearing the parameters
- * and warnings contained.
- *
PreparedStatement instance on which it
- * operates are thread safe.
- *
- * @param stmt The PreparedStatement to reset. If
- * null this method does nothing.
- */
- protected void resetStatement(PreparedStatement stmt) {
- if (stmt != null) {
- try {
- stmt.clearParameters();
- stmt.clearWarnings();
- } catch (SQLException se) {
- log.error("failed resetting PreparedStatement", se);
- }
- }
- }
-
protected void closeResultSet(ResultSet rs) {
if (rs != null) {
try {
@@ -1510,6 +1347,10 @@
}
}
+ /**
+ * closes the statement
+ * @param stmt the statement
+ */
protected void closeStatement(Statement stmt) {
if (stmt != null) {
try {
@@ -1515,7 +1356,7 @@
try {
stmt.close();
} catch (SQLException se) {
- log.error("failed closing Statement", se);
+ log.error("Failed closing Statement", se);
}
}
}
@@ -1520,43 +1361,4 @@
}
}
- //--------------------------------------------------------< inner classes >
-
- class SizedInputStream extends FilterInputStream {
- private final long size;
- private boolean consumed = false;
-
- SizedInputStream(InputStream in, long size) {
- super(in);
- this.size = size;
- }
-
- long getSize() {
- return size;
- }
-
- boolean isConsumed() {
- return consumed;
- }
-
- public int read() throws IOException {
- consumed = true;
- return super.read();
- }
-
- public long skip(long n) throws IOException {
- consumed = true;
- return super.skip(n);
- }
-
- public int read(byte[] b) throws IOException {
- consumed = true;
- return super.read(b);
- }
-
- public int read(byte[] b, int off, int len) throws IOException {
- consumed = true;
- return super.read(b, off, len);
- }
- }
}
Index: src/main/java/org/apache/jackrabbit/core/fs/db/DbFileSystem.java
===================================================================
--- src/main/java/org/apache/jackrabbit/core/fs/db/DbFileSystem.java (revision 676201)
+++ src/main/java/org/apache/jackrabbit/core/fs/db/DbFileSystem.java (working copy)
@@ -16,12 +16,10 @@
*/
package org.apache.jackrabbit.core.fs.db;
-import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionFactory;
-
-import java.sql.Connection;
-import java.sql.SQLException;
-
-import javax.jcr.RepositoryException;
+import org.apache.jackrabbit.core.fs.FileSystemContext;
+import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionManager;
+import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionProvider;
+import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionProvider.ConnectionProperties;
/**
* DbFileSystem is a generic JDBC-based FileSystem
@@ -187,13 +185,21 @@
//--------------------------------------------------< DatabaseFileSystem >
- /**
- * Initialize the JDBC connection.
- *
- * @throws SQLException if an error occurs
- */
- protected Connection getConnection() throws RepositoryException, SQLException {
- return ConnectionFactory.getConnection(driver, url, user, password);
+ protected ConnectionManager getConnectionManager(FileSystemContext context)
+ throws Exception {
+
+ ConnectionProperties properties = new ConnectionProperties();
+ properties.setDriver(getDriver());
+ properties.setUrl(getUrl());
+ properties.setUser(getUser());
+ properties.setPassword(getPassword());
+
+ ConnectionProvider provider = context.getConnectionProvider();
+ if (provider == null) {
+ throw new IllegalStateException("FileSystemContext must provide valid ConnectionProvider.");
+ }
+
+ return new ConnectionManager(provider, properties, true);
}
}
Index: src/main/java/org/apache/jackrabbit/core/fs/db/DerbyFileSystem.java
===================================================================
--- src/main/java/org/apache/jackrabbit/core/fs/db/DerbyFileSystem.java (revision 676201)
+++ src/main/java/org/apache/jackrabbit/core/fs/db/DerbyFileSystem.java (working copy)
@@ -16,13 +16,16 @@
*/
package org.apache.jackrabbit.core.fs.db;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+import javax.jcr.RepositoryException;
+
+import org.apache.jackrabbit.core.fs.FileSystemException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.sql.Connection;
-
/**
* DerbyFileSystem is a JDBC-based FileSystem
* implementation for Jackrabbit that persists file system entries in an
@@ -40,7 +43,7 @@
* user: the database user (default: null)password: the user's password (default: null)shutdownOnClose: if true (the default) the
- * database is shutdown when the last connection is closed;
+ * database is shutdown when the file system is closed;
* set this to false when using a standalone databaseshutdownOnClose is set to true.
- *
- * @param connection database connection
- * @throws SQLException if an error occurs
- * @see DatabaseFileSystem#closeConnection(Connection)
- */
- protected void closeConnection(Connection connection) throws SQLException {
- // prepare connection url for issuing shutdown command
- String url;
- try {
- url = connection.getMetaData().getURL();
- } catch (SQLException e) {
- // JCR-1557: embedded derby db probably already shut down;
- // this happens when configuring multiple FS/PM instances
- // to use the same embedded derby db instance.
- log.debug("failed to retrieve connection url: embedded db probably already shut down", e);
+ /** name of the embedded driver */
+ public static final String DERBY_EMBEDDED_DRIVER = "org.apache.derby.jdbc.EmbeddedDriver";
+
+ private void shutDown() throws RepositoryException, SQLException {
+ // check for embedded driver
+ if (!DERBY_EMBEDDED_DRIVER.equals(getDriver())) {
return;
}
- int pos = url.lastIndexOf(';');
- if (pos != -1) {
- // strip any attributes from connection url
- url = url.substring(0, pos);
+
+ Connection connection = null;
+ try {
+ connection = connectionManager.getConnection();
+ // prepare connection url for issuing shutdown command
+ String url = connection.getMetaData().getURL();
+ int pos = url.lastIndexOf(';');
+ if (pos != -1) {
+ // strip any attributes from connection url
+ url = url.substring(0, pos);
+ }
+ url += ";shutdown=true";
+
+ // we have to reset the connection to 'autoCommit=true' before closing it;
+ // otherwise Derby would mysteriously complain about some pending uncommitted
+ // changes which can't possibly be true.
+ // @todo further investigate
+ connection.setAutoCommit(true);
+
+ } finally {
+ connectionManager.close(connection);
+ }
+
+ // now it's safe to shutdown the embedded Derby database
+ try {
+ DriverManager.getConnection(url);
+ } catch (SQLException e) {
+ // a shutdown command always raises a SQLException
+ log.info(e.getMessage());
}
- url += ";shutdown=true";
+ };
- // we have to reset the connection to 'autoCommit=true' before closing it;
- // otherwise Derby would mysteriously complain about some pending uncommitted
- // changes which can't possibly be true.
- // @todo further investigate
- connection.setAutoCommit(true);
- connection.close();
+ public void close() throws FileSystemException {
if (shutdownOnClose) {
- // now it's safe to shutdown the embedded Derby database
try {
- DriverManager.getConnection(url);
- } catch (SQLException e) {
- // a shutdown command always raises a SQLException
- log.info(e.getMessage());
+ shutDown();
+ } catch (Exception e) {
+ throw new FileSystemException("Error shutting down DerbyFileSystem", e);
}
}
+
+ super.close();
}
}
Index: src/main/java/org/apache/jackrabbit/core/fs/db/JNDIDatabaseFileSystem.java
===================================================================
--- src/main/java/org/apache/jackrabbit/core/fs/db/JNDIDatabaseFileSystem.java (revision 676201)
+++ src/main/java/org/apache/jackrabbit/core/fs/db/JNDIDatabaseFileSystem.java (working copy)
@@ -37,6 +37,8 @@
* WARNING: The acquired database connection is kept
* for the entire lifetime of the file system instance. The configured data
* source should be prepared for this.
+ *
+ * THIS CLASS NO LONGER WORKS WITH CONNECTION POOLING
*/
public class JNDIDatabaseFileSystem extends DatabaseFileSystem {
Index: src/main/java/org/apache/jackrabbit/core/fs/db/OracleFileSystem.java
===================================================================
--- src/main/java/org/apache/jackrabbit/core/fs/db/OracleFileSystem.java (revision 676201)
+++ src/main/java/org/apache/jackrabbit/core/fs/db/OracleFileSystem.java (working copy)
@@ -19,6 +19,7 @@
import org.apache.commons.io.IOUtils;
import org.apache.jackrabbit.util.Text;
import org.apache.jackrabbit.util.TransientFileFactory;
+import org.apache.jackrabbit.core.fs.FileSystemContext;
import org.apache.jackrabbit.core.fs.FileSystemException;
import org.apache.jackrabbit.core.fs.FileSystemPathUtil;
import org.apache.jackrabbit.core.fs.RandomAccessOutputStream;
@@ -135,8 +136,8 @@
* @see oracle.sql.BLOB#DURATION_SESSION
* @see oracle.sql.BLOB#MODE_READWRITE
*/
- public void init() throws FileSystemException {
- super.init();
+ public void init(FileSystemContext context) throws FileSystemException {
+ super.init(context);
// initialize oracle.sql.BLOB class & constants
@@ -142,8 +143,10 @@
// use the Connection object for using the exact same
// class loader that the Oracle driver was loaded with
+ Connection connection = null;
try {
- blobClass = con.getClass().getClassLoader().loadClass("oracle.sql.BLOB");
+ connection = connectionManager.getConnection();
+ blobClass = connection.getClass().getClassLoader().loadClass("oracle.sql.BLOB");
durationSessionConstant =
new Integer(blobClass.getField("DURATION_SESSION").getInt(null));
modeReadWriteConstant =
@@ -152,6 +155,8 @@
String msg = "failed to load/introspect oracle.sql.BLOB";
log.error(msg, e);
throw new FileSystemException(msg, e);
+ } finally {
+ connectionManager.close(connection);
}
}
@@ -165,60 +170,67 @@
* @throws Exception if an error occurs
*/
protected void checkSchema() throws Exception {
- DatabaseMetaData metaData = con.getMetaData();
- String tableName = schemaObjectPrefix + "FSENTRY";
- if (metaData.storesLowerCaseIdentifiers()) {
- tableName = tableName.toLowerCase();
- } else if (metaData.storesUpperCaseIdentifiers()) {
- tableName = tableName.toUpperCase();
- }
- String userName = metaData.getUserName();
-
- ResultSet rs = metaData.getTables(null, userName, tableName, null);
- boolean schemaExists;
+ Connection connection = null;
try {
- schemaExists = rs.next();
- } finally {
- rs.close();
- }
+ connection = connectionManager.getConnection();
+ DatabaseMetaData metaData = connection.getMetaData();
+ String tableName = schemaObjectPrefix + "FSENTRY";
+ if (metaData.storesLowerCaseIdentifiers()) {
+ tableName = tableName.toLowerCase();
+ } else if (metaData.storesUpperCaseIdentifiers()) {
+ tableName = tableName.toUpperCase();
+ }
+ String userName = metaData.getUserName();
- if (!schemaExists) {
- // read ddl from resources
- InputStream in = OracleFileSystem.class.getResourceAsStream(schema + ".ddl");
- if (in == null) {
- String msg = "Configuration error: unknown schema '" + schema + "'";
- log.debug(msg);
- throw new RepositoryException(msg);
- }
- BufferedReader reader = new BufferedReader(new InputStreamReader(in));
- Statement stmt = con.createStatement();
+ ResultSet rs = metaData.getTables(null, userName, tableName, null);
+ boolean schemaExists;
try {
- String sql = reader.readLine();
- while (sql != null) {
- // Skip comments and empty lines
- if (!sql.startsWith("#") && sql.length() > 0) {
- // replace prefix variable
- sql = Text.replace(sql, SCHEMA_OBJECT_PREFIX_VARIABLE, schemaObjectPrefix);
+ schemaExists = rs.next();
+ } finally {
+ rs.close();
+ }
- // set the tablespace if it is defined
- String tspace;
- if (tableSpace == null || "".equals(tableSpace)) {
- tspace = "";
- } else {
- tspace = "tablespace " + tableSpace;
+ if (!schemaExists) {
+ // read ddl from resources
+ InputStream in = OracleFileSystem.class.getResourceAsStream(schema + ".ddl");
+ if (in == null) {
+ String msg = "Configuration error: unknown schema '" + schema + "'";
+ log.debug(msg);
+ throw new RepositoryException(msg);
+ }
+ BufferedReader reader = new BufferedReader(new InputStreamReader(in));
+ Statement stmt = connection.createStatement();
+ try {
+ String sql = reader.readLine();
+ while (sql != null) {
+ // Skip comments and empty lines
+ if (!sql.startsWith("#") && sql.length() > 0) {
+ // replace prefix variable
+ sql = Text.replace(sql, SCHEMA_OBJECT_PREFIX_VARIABLE, schemaObjectPrefix);
+
+ // set the tablespace if it is defined
+ String tspace;
+ if (tableSpace == null || "".equals(tableSpace)) {
+ tspace = "";
+ } else {
+ tspace = "tablespace " + tableSpace;
+ }
+ sql = Text.replace(sql, TABLE_SPACE_VARIABLE, tspace).trim();
+
+ // execute sql stmt
+ stmt.executeUpdate(sql);
}
- sql = Text.replace(sql, TABLE_SPACE_VARIABLE, tspace).trim();
-
- // execute sql stmt
- stmt.executeUpdate(sql);
+ // read next sql stmt
+ sql = reader.readLine();
}
- // read next sql stmt
- sql = reader.readLine();
+ } finally {
+ IOUtils.closeQuietly(in);
+ closeStatement(stmt);
}
- } finally {
- IOUtils.closeQuietly(in);
- closeStatement(stmt);
}
+
+ } finally {
+ connectionManager.close(connection);
}
}
@@ -368,35 +380,33 @@
InputStream in = null;
Blob blob = null;
+ Connection connection = null;
try {
+ connection = connectionManager.getConnection();
if (isFile(filePath)) {
- synchronized (updateDataSQL) {
- long length = tmpFile.length();
- in = new FileInputStream(tmpFile);
- blob = createTemporaryBlob(in);
- executeStmt(updateDataSQL,
- new Object[]{
- blob,
- new Long(System.currentTimeMillis()),
- new Long(length),
- parentDir,
- name
- });
- }
+ long length = tmpFile.length();
+ in = new FileInputStream(tmpFile);
+ blob = createTemporaryBlob(in);
+ connectionManager.executeStmt(connection, updateDataSQL,
+ new Object[]{
+ blob,
+ new Long(System.currentTimeMillis()),
+ new Long(length),
+ parentDir,
+ name
+ });
} else {
- synchronized (insertFileSQL) {
- long length = tmpFile.length();
- in = new FileInputStream(tmpFile);
- blob = createTemporaryBlob(in);
- executeStmt(insertFileSQL,
- new Object[]{
- parentDir,
- name,
- blob,
- new Long(System.currentTimeMillis()),
- new Long(length)
- });
- }
+ long length = tmpFile.length();
+ in = new FileInputStream(tmpFile);
+ blob = createTemporaryBlob(in);
+ connectionManager.executeStmt(connection, insertFileSQL,
+ new Object[]{
+ parentDir,
+ name,
+ blob,
+ new Long(System.currentTimeMillis()),
+ new Long(length)
+ });
}
} catch (Exception e) {
IOException ioe = new IOException(e.getMessage());
@@ -409,6 +419,7 @@
} catch (Exception e1) {
}
}
+ connectionManager.close(connection);
IOUtils.closeQuietly(in);
// temp file can now safely be removed
tmpFile.delete();
@@ -472,35 +483,37 @@
InputStream in = null;
Blob blob = null;
+ Connection connection = null;
try {
+ connection = connectionManager.getConnection();
if (isFile(filePath)) {
- synchronized (updateDataSQL) {
- long length = tmpFile.length();
- in = new FileInputStream(tmpFile);
- blob = createTemporaryBlob(in);
- executeStmt(updateDataSQL,
- new Object[]{
- blob,
- new Long(System.currentTimeMillis()),
- new Long(length),
- parentDir,
- name
- });
- }
+
+ long length = tmpFile.length();
+ in = new FileInputStream(tmpFile);
+ blob = createTemporaryBlob(in);
+ connectionManager.executeStmt(connection, updateDataSQL,
+ new Object[]{
+ blob,
+ new Long(System.currentTimeMillis()),
+ new Long(length),
+ parentDir,
+ name
+ });
+
} else {
- synchronized (insertFileSQL) {
- long length = tmpFile.length();
- in = new FileInputStream(tmpFile);
- blob = createTemporaryBlob(in);
- executeStmt(insertFileSQL,
- new Object[]{
- parentDir,
- name,
- blob,
- new Long(System.currentTimeMillis()),
- new Long(length)
- });
- }
+
+ long length = tmpFile.length();
+ in = new FileInputStream(tmpFile);
+ blob = createTemporaryBlob(in);
+ connectionManager.executeStmt(connection, insertFileSQL,
+ new Object[]{
+ parentDir,
+ name,
+ blob,
+ new Long(System.currentTimeMillis()),
+ new Long(length)
+ });
+
}
} catch (Exception e) {
IOException ioe = new IOException(e.getMessage());
@@ -513,6 +526,7 @@
} catch (Exception e1) {
}
}
+ connectionManager.close(connection);
IOUtils.closeQuietly(in);
// temp file can now safely be removed
tmpFile.delete();
@@ -562,27 +576,33 @@
blob.close();
return blob;
*/
- Method createTemporary = blobClass.getMethod("createTemporary",
- new Class[]{Connection.class, Boolean.TYPE, Integer.TYPE});
- Object blob = createTemporary.invoke(null,
- new Object[]{con, Boolean.FALSE, durationSessionConstant});
- Method open = blobClass.getMethod("open", new Class[]{Integer.TYPE});
- open.invoke(blob, new Object[]{modeReadWriteConstant});
- Method getBinaryOutputStream =
+ Connection connection = null;
+ try {
+ connection = connectionManager.getConnection();
+ Method createTemporary = blobClass.getMethod("createTemporary",
+ new Class[]{Connection.class, Boolean.TYPE, Integer.TYPE});
+ Object blob = createTemporary.invoke(null,
+ new Object[]{connection, Boolean.FALSE, durationSessionConstant});
+ Method open = blobClass.getMethod("open", new Class[]{Integer.TYPE});
+ open.invoke(blob, new Object[]{modeReadWriteConstant});
+ Method getBinaryOutputStream =
blobClass.getMethod("getBinaryOutputStream", new Class[0]);
- OutputStream out = (OutputStream) getBinaryOutputStream.invoke(blob, null);
- try {
- IOUtils.copy(in, out);
- } finally {
+ OutputStream out = (OutputStream) getBinaryOutputStream.invoke(blob, null);
try {
- out.flush();
- } catch (IOException ioe) {
+ IOUtils.copy(in, out);
+ } finally {
+ try {
+ out.flush();
+ } catch (IOException ioe) {
+ }
+ out.close();
}
- out.close();
+ Method close = blobClass.getMethod("close", new Class[0]);
+ close.invoke(blob, null);
+ return (Blob) blob;
+ } finally {
+ connectionManager.close(connection);
}
- Method close = blobClass.getMethod("close", new Class[0]);
- close.invoke(blob, null);
- return (Blob) blob;
}
/**
Index: src/main/java/org/apache/jackrabbit/core/fs/FileSystem.java
===================================================================
--- src/main/java/org/apache/jackrabbit/core/fs/FileSystem.java (revision 676201)
+++ src/main/java/org/apache/jackrabbit/core/fs/FileSystem.java (working copy)
@@ -43,9 +43,11 @@
/**
* Initialize the file system
*
+ * @param context file system context
+ *
* @throws FileSystemException if the file system initialization fails
*/
- void init() throws FileSystemException;
+ void init(FileSystemContext fileSystemContext) throws FileSystemException;
/**
* Close the file system. After calling this method, the file system is no
Index: src/main/java/org/apache/jackrabbit/core/fs/local/LocalFileSystem.java
===================================================================
--- src/main/java/org/apache/jackrabbit/core/fs/local/LocalFileSystem.java (revision 676201)
+++ src/main/java/org/apache/jackrabbit/core/fs/local/LocalFileSystem.java (working copy)
@@ -17,6 +17,7 @@
package org.apache.jackrabbit.core.fs.local;
import org.apache.jackrabbit.core.fs.FileSystem;
+import org.apache.jackrabbit.core.fs.FileSystemContext;
import org.apache.jackrabbit.core.fs.FileSystemException;
import org.apache.jackrabbit.core.fs.RandomAccessOutputStream;
import org.apache.jackrabbit.util.LazyFileInputStream;
@@ -143,7 +144,7 @@
/**
* {@inheritDoc}
*/
- public void init() throws FileSystemException {
+ public void init(FileSystemContext fileSystemContext) throws FileSystemException {
if (root == null) {
String msg = "root directory not set";
log.debug(msg);
Index: src/main/java/org/apache/jackrabbit/core/fs/mem/MemoryFileSystem.java
===================================================================
--- src/main/java/org/apache/jackrabbit/core/fs/mem/MemoryFileSystem.java (revision 676201)
+++ src/main/java/org/apache/jackrabbit/core/fs/mem/MemoryFileSystem.java (working copy)
@@ -30,6 +30,7 @@
import org.apache.commons.io.IOUtils;
import org.apache.jackrabbit.core.fs.FileSystem;
+import org.apache.jackrabbit.core.fs.FileSystemContext;
import org.apache.jackrabbit.core.fs.FileSystemException;
import org.apache.jackrabbit.core.fs.RandomAccessOutputStream;
@@ -166,7 +167,7 @@
return list(path).length > 0;
}
- public void init() {
+ public void init(FileSystemContext context) {
createFolderInternal("/");
}
Index: src/main/java/org/apache/jackrabbit/core/fs/FileSystemContext.java
===================================================================
--- src/main/java/org/apache/jackrabbit/core/fs/FileSystemContext.java (revision 0)
+++ src/main/java/org/apache/jackrabbit/core/fs/FileSystemContext.java (revision 0)
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.core.fs;
+
+import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionProvider;
+
+/**
+ * {@link FileSystemContext} is used to pass contextual information to a {@link FileSystem}.
+ */
+public class FileSystemContext {
+
+ private final ConnectionProvider connectionProvider;
+
+ /**
+ * Creates new FileSystemContext
+ *
+ * @param connectionProvider
+ * Connection Provider. Not mandatory if the FileSystem doesn't
+ * use database.
+ */
+ public FileSystemContext(ConnectionProvider connectionProvider) {
+ this.connectionProvider = connectionProvider;
+ }
+
+ public ConnectionProvider getConnectionProvider() {
+ return connectionProvider;
+ }
+
+}
Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/BundleDbPersistenceManager.java
===================================================================
--- src/main/java/org/apache/jackrabbit/core/persistence/bundle/BundleDbPersistenceManager.java (revision 681175)
+++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/BundleDbPersistenceManager.java (working copy)
@@ -46,6 +46,7 @@
import org.apache.jackrabbit.core.NodeIdIterator;
import org.apache.jackrabbit.core.PropertyId;
import org.apache.jackrabbit.core.fs.FileSystem;
+import org.apache.jackrabbit.core.fs.FileSystemContext;
import org.apache.jackrabbit.core.fs.FileSystemResource;
import org.apache.jackrabbit.core.fs.local.LocalFileSystem;
import org.apache.jackrabbit.core.persistence.PMContext;
@@ -50,11 +51,12 @@
import org.apache.jackrabbit.core.fs.local.LocalFileSystem;
import org.apache.jackrabbit.core.persistence.PMContext;
import org.apache.jackrabbit.core.persistence.bundle.util.BundleBinding;
-import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionRecoveryManager;
+import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionManager;
import org.apache.jackrabbit.core.persistence.bundle.util.DbNameIndex;
import org.apache.jackrabbit.core.persistence.bundle.util.ErrorHandling;
import org.apache.jackrabbit.core.persistence.bundle.util.NodePropBundle;
-import org.apache.jackrabbit.core.util.StringIndex;
+import org.apache.jackrabbit.core.persistence.bundle.util.StreamWrapper;
+import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionProvider.ConnectionProperties;
import org.apache.jackrabbit.core.persistence.util.BLOBStore;
import org.apache.jackrabbit.core.persistence.util.FileSystemBLOBStore;
import org.apache.jackrabbit.core.persistence.util.Serializer;
@@ -63,6 +65,7 @@
import org.apache.jackrabbit.core.state.NoSuchItemStateException;
import org.apache.jackrabbit.core.state.NodeReferences;
import org.apache.jackrabbit.core.state.NodeReferencesId;
+import org.apache.jackrabbit.core.util.StringIndex;
import org.apache.jackrabbit.util.Text;
import org.apache.jackrabbit.uuid.UUID;
import org.slf4j.Logger;
@@ -139,10 +142,7 @@
/** indicates whether to block if the database connection is lost */
protected boolean blockOnConnectionLoss = false;
- /**
- * The class that manages statement execution and recovery from connection loss.
- */
- protected ConnectionRecoveryManager connectionManager;
+ protected ConnectionManager connectionManager;
// SQL statements for bundle management
protected String bundleInsertSQL;
@@ -425,9 +425,11 @@
throw new RepositoryException(msg);
}
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
- Statement stmt = connectionManager.getConnection().createStatement();
+ Connection connection = connectionManager.getConnection();
+ Statement stmt = null;
String sql = null;
try {
+ stmt = connection.createStatement();
sql = reader.readLine();
while (sql != null) {
if (!sql.startsWith("#") && sql.length() > 0
@@ -451,7 +453,9 @@
throw se;
} finally {
IOUtils.closeQuietly(in);
- stmt.close();
+ connectionManager.close(connection, stmt);
+ connectionManager.close(connection);
+
}
}
}
@@ -477,19 +481,24 @@
* @throws RepositoryException if a repository exception occurs.
*/
protected boolean checkTablesExist() throws SQLException, RepositoryException {
- DatabaseMetaData metaData = connectionManager.getConnection().getMetaData();
- String tableName = schemaObjectPrefix + "BUNDLE";
- if (metaData.storesLowerCaseIdentifiers()) {
- tableName = tableName.toLowerCase();
- } else if (metaData.storesUpperCaseIdentifiers()) {
- tableName = tableName.toUpperCase();
- }
- String userName = checkTablesWithUser() ? metaData.getUserName() : null;
- ResultSet rs = metaData.getTables(null, userName, tableName, null);
+ Connection connection = connectionManager.getConnection();
try {
- return rs.next();
+ DatabaseMetaData metaData = connection.getMetaData();
+ String tableName = schemaObjectPrefix + "BUNDLE";
+ if (metaData.storesLowerCaseIdentifiers()) {
+ tableName = tableName.toLowerCase();
+ } else if (metaData.storesUpperCaseIdentifiers()) {
+ tableName = tableName.toUpperCase();
+ }
+ String userName = checkTablesWithUser() ? metaData.getUserName() : null;
+ ResultSet rs = metaData.getTables(null, userName, tableName, null);
+ try {
+ return rs.next();
+ } finally {
+ rs.close();
+ }
} finally {
- rs.close();
+ connectionManager.close(connection);
}
}
@@ -514,33 +523,34 @@
public synchronized void store(ChangeLog changeLog) throws ItemStateException {
int trials = 2;
Throwable lastException = null;
- do {
- trials--;
- Connection con = null;
- try {
- con = connectionManager.getConnection();
- connectionManager.setAutoReconnect(false);
- con.setAutoCommit(false);
- super.store(changeLog);
- con.commit();
- con.setAutoCommit(true);
- return;
- } catch (Throwable th) {
- lastException = th;
+ Connection con = null;
+ try {
+ do {
+ trials--;
try {
- if (con != null) {
- con.rollback();
+ con = connectionManager.getConnection();
+ con.setAutoCommit(false);
+ super.store(changeLog);
+ con.commit();
+ con.setAutoCommit(true);
+ return;
+ } catch (Throwable th) {
+ lastException = th;
+ try {
+ if (con != null) {
+ con.rollback();
+ }
+ } catch (SQLException e) {
+ logException("rollback failed", e);
+ }
+ if (th instanceof SQLException || th.getCause() instanceof SQLException) {
+ connectionManager.close(con);
}
- } catch (SQLException e) {
- logException("rollback failed", e);
- }
- if (th instanceof SQLException || th.getCause() instanceof SQLException) {
- connectionManager.close();
}
- } finally {
- connectionManager.setAutoReconnect(true);
- }
- } while(blockOnConnectionLoss || trials > 0);
+ } while(blockOnConnectionLoss || trials > 0);
+ } finally {
+ connectionManager.close(con);
+ }
throw new ItemStateException(lastException.getMessage());
}
@@ -555,8 +565,14 @@
this.name = context.getHomeDir().getName();
- connectionManager = new ConnectionRecoveryManager(blockOnConnectionLoss,
- getDriver(), getUrl(), getUser(), getPassword());
+ ConnectionProperties properties = new ConnectionProperties();
+ properties.setUrl(getUrl());
+ properties.setUser(getUser());
+ properties.setPassword(getPassword());
+ properties.setDriver(getDriver());
+
+
+ connectionManager = new ConnectionManager(context.getConnectionProvider(), properties, true);
// make sure schemaObjectPrefix consists of legal name characters only
prepareSchemaObjectPrefix();
@@ -659,7 +675,7 @@
*/
LocalFileSystem blobFS = new LocalFileSystem();
blobFS.setRoot(new File(context.getHomeDir(), "blobs"));
- blobFS.init();
+ blobFS.init(new FileSystemContext(context.getConnectionProvider()));
return new FSBlobStore(blobFS);
}
@@ -763,9 +779,11 @@
// get all node bundles in the database with a single sql statement,
// which is (probably) faster than loading each bundle and traversing the tree
ResultSet rs = null;
+ Connection connection = null;
try {
+ connection = connectionManager.getConnection();
String sql = "select count(*) from " + schemaObjectPrefix + "BUNDLE";
- Statement stmt = connectionManager.executeStmt(sql, new Object[0]);
+ Statement stmt = connectionManager.executeStmt(connection, sql, new Object[0]);
try {
rs = stmt.getResultSet();
if (!rs.next()) {
@@ -781,7 +799,7 @@
} else {
sql = "select NODE_ID_HI, NODE_ID_LO from " + schemaObjectPrefix + "BUNDLE";
}
- stmt = connectionManager.executeStmt(sql, new Object[0]);
+ stmt = connectionManager.executeStmt(connection, sql, new Object[0]);
rs = stmt.getResultSet();
// iterate over all node bundles in the db
@@ -797,7 +815,7 @@
ResultSet bRs = null;
byte[] data = null;
try {
- Statement bSmt = connectionManager.executeStmt(bundleSelectSQL, getKey(id.getUUID()));
+ Statement bSmt = connectionManager.executeStmt(connection, bundleSelectSQL, getKey(id.getUUID()));
bRs = bSmt.getResultSet();
if (!bRs.next()) {
throw new SQLException("bundle cannot be retrieved?");
@@ -808,7 +826,6 @@
closeResultSet(bRs);
}
-
try {
// parse and check bundle
// checkBundle will log any problems itself
@@ -833,6 +850,7 @@
log.error("Error loading bundle", e);
} finally {
closeResultSet(rs);
+ connectionManager.close(connection);
total = count;
}
} else {
@@ -917,25 +935,31 @@
* @throws Exception if an error occurs
*/
protected void prepareSchemaObjectPrefix() throws Exception {
- DatabaseMetaData metaData = connectionManager.getConnection().getMetaData();
- String legalChars = metaData.getExtraNameCharacters();
- legalChars += "ABCDEFGHIJKLMNOPQRSTUVWXZY0123456789_";
+ Connection connection = null;
+ try {
+ connection = connectionManager.getConnection();
+ DatabaseMetaData metaData = connection.getMetaData();
+ String legalChars = metaData.getExtraNameCharacters();
+ legalChars += "ABCDEFGHIJKLMNOPQRSTUVWXZY0123456789_";
- String prefix = schemaObjectPrefix.toUpperCase();
- StringBuffer escaped = new StringBuffer();
- for (int i = 0; i < prefix.length(); i++) {
- char c = prefix.charAt(i);
- if (legalChars.indexOf(c) == -1) {
- escaped.append("_x");
- String hex = Integer.toHexString(c);
- escaped.append("0000".toCharArray(), 0, 4 - hex.length());
- escaped.append(hex);
- escaped.append("_");
- } else {
- escaped.append(c);
+ String prefix = schemaObjectPrefix.toUpperCase();
+ StringBuffer escaped = new StringBuffer();
+ for (int i = 0; i < prefix.length(); i++) {
+ char c = prefix.charAt(i);
+ if (legalChars.indexOf(c) == -1) {
+ escaped.append("_x");
+ String hex = Integer.toHexString(c);
+ escaped.append("0000".toCharArray(), 0, 4 - hex.length());
+ escaped.append(hex);
+ escaped.append("_");
+ } else {
+ escaped.append(c);
+ }
}
+ schemaObjectPrefix = escaped.toString();
+ } finally {
+ connectionManager.close(connection);
}
- schemaObjectPrefix = escaped.toString();
}
/**
@@ -950,7 +974,6 @@
if (nameIndex instanceof DbNameIndex) {
((DbNameIndex) nameIndex).close();
}
- connectionManager.close();
// close blob store
blobStore.close();
blobStore = null;
@@ -1035,6 +1058,7 @@
public synchronized NodeIdIterator getAllNodeIds(NodeId bigger, int maxCount)
throws ItemStateException, RepositoryException {
ResultSet rs = null;
+ Connection connection = null;
try {
UUID lowUuid;
Object[] keys;
@@ -1055,7 +1079,8 @@
// see also bundleSelectAllIdsFrom SQL statement
maxCount += 10;
}
- Statement stmt = connectionManager.executeStmt(sql, keys, false, maxCount);
+ connection = connectionManager.getConnection();
+ Statement stmt = connectionManager.executeStmt(connection, sql, keys, false, maxCount);
rs = stmt.getResultSet();
ArrayList result = new ArrayList();
while ((maxCount == 0 || result.size() < maxCount) && rs.next()) {
@@ -1082,6 +1107,7 @@
throw new ItemStateException(msg, e);
} finally {
closeResultSet(rs);
+ connectionManager.close(connection);
}
}
@@ -1131,8 +1157,10 @@
protected synchronized NodePropBundle loadBundle(NodeId id, boolean checkBeforeLoading)
throws ItemStateException {
ResultSet rs = null;
+ Connection connection = null;
try {
- Statement stmt = connectionManager.executeStmt(bundleSelectSQL, getKey(id.getUUID()));
+ connection = connectionManager.getConnection();
+ Statement stmt = connectionManager.executeStmt(connection, bundleSelectSQL, getKey(id.getUUID()));
rs = stmt.getResultSet();
if (!rs.next()) {
return null;
@@ -1160,6 +1188,7 @@
throw new ItemStateException(msg, e);
} finally {
closeResultSet(rs);
+ connectionManager.close(connection);
}
}
@@ -1168,8 +1197,10 @@
*/
protected synchronized boolean existsBundle(NodeId id) throws ItemStateException {
ResultSet rs = null;
+ Connection connection = null;
try {
- Statement stmt = connectionManager.executeStmt(bundleSelectSQL, getKey(id.getUUID()));
+ connection = connectionManager.getConnection();
+ Statement stmt = connectionManager.executeStmt(connection, bundleSelectSQL, getKey(id.getUUID()));
rs = stmt.getResultSet();
// a bundle exists, if the result has at least one entry
return rs.next();
@@ -1179,6 +1210,7 @@
throw new ItemStateException(msg, e);
} finally {
closeResultSet(rs);
+ connectionManager.close(connection);
}
}
@@ -1186,6 +1218,7 @@
* {@inheritDoc}
*/
protected synchronized void storeBundle(NodePropBundle bundle) throws ItemStateException {
+ Connection connection = null;
try {
ByteArrayOutputStream out = new ByteArrayOutputStream(INITIAL_BUFFER_SIZE);
DataOutputStream dout = new DataOutputStream(out);
@@ -1194,7 +1227,8 @@
String sql = bundle.isNew() ? bundleInsertSQL : bundleUpdateSQL;
Object[] params = createParams(bundle.getId().getUUID(), out.toByteArray(), true);
- connectionManager.executeStmt(sql, params);
+ connection = connectionManager.getConnection();
+ connectionManager.executeStmt(connection, sql, params);
} catch (Exception e) {
String msg = "failed to write bundle: " + bundle.getId();
log.error(msg, e);
@@ -1199,6 +1233,8 @@
String msg = "failed to write bundle: " + bundle.getId();
log.error(msg, e);
throw new ItemStateException(msg, e);
+ } finally {
+ connectionManager.close(connection);
}
}
@@ -1206,8 +1242,10 @@
* {@inheritDoc}
*/
protected synchronized void destroyBundle(NodePropBundle bundle) throws ItemStateException {
+ Connection connection = null;
try {
- connectionManager.executeStmt(bundleDeleteSQL, getKey(bundle.getId().getUUID()));
+ connection = connectionManager.getConnection();
+ connectionManager.executeStmt(connection, bundleDeleteSQL, getKey(bundle.getId().getUUID()));
} catch (Exception e) {
if (e instanceof NoSuchItemStateException) {
throw (NoSuchItemStateException) e;
@@ -1215,6 +1253,8 @@
String msg = "failed to delete bundle: " + bundle.getId();
log.error(msg, e);
throw new ItemStateException(msg, e);
+ } finally {
+ connectionManager.close(connection);
}
}
@@ -1229,8 +1269,10 @@
ResultSet rs = null;
InputStream in = null;
+ Connection connection = null;
try {
- Statement stmt = connectionManager.executeStmt(
+ connection = connectionManager.getConnection();
+ Statement stmt = connectionManager.executeStmt(connection,
nodeReferenceSelectSQL, getKey(targetId.getTargetId().getUUID()));
rs = stmt.getResultSet();
if (!rs.next()) {
@@ -1252,6 +1294,7 @@
} finally {
IOUtils.closeQuietly(in);
closeResultSet(rs);
+ connectionManager.close(connection);
}
}
@@ -1272,7 +1315,7 @@
// check if insert or update
boolean update = exists(refs.getId());
String sql = (update) ? nodeReferenceUpdateSQL : nodeReferenceInsertSQL;
-
+ Connection connection = null;
try {
ByteArrayOutputStream out =
new ByteArrayOutputStream(INITIAL_BUFFER_SIZE);
@@ -1280,7 +1323,8 @@
Serializer.serialize(refs, out);
Object[] params = createParams(refs.getTargetId().getUUID(), out.toByteArray(), true);
- connectionManager.executeStmt(sql, params);
+ connection = connectionManager.getConnection();
+ connectionManager.executeStmt(connection, sql, params);
// there's no need to close a ByteArrayOutputStream
//out.close();
@@ -1288,6 +1332,8 @@
String msg = "failed to write node references: " + refs.getId();
log.error(msg, e);
throw new ItemStateException(msg, e);
+ } finally {
+ connectionManager.close(connection);
}
}
@@ -1298,9 +1344,10 @@
if (!initialized) {
throw new IllegalStateException("not initialized");
}
-
+ Connection connection = null;
try {
- connectionManager.executeStmt(nodeReferenceDeleteSQL,
+ connection = connectionManager.getConnection();
+ connectionManager.executeStmt(connection, nodeReferenceDeleteSQL,
getKey(refs.getTargetId().getUUID()));
} catch (Exception e) {
if (e instanceof NoSuchItemStateException) {
@@ -1309,6 +1356,8 @@
String msg = "failed to delete references: " + refs.getTargetId();
log.error(msg, e);
throw new ItemStateException(msg, e);
+ } finally {
+ connectionManager.close(connection);
}
}
@@ -1321,8 +1370,10 @@
}
ResultSet rs = null;
+ Connection connection = null;
try {
- Statement stmt = connectionManager.executeStmt(nodeReferenceSelectSQL,
+ connection = connectionManager.getConnection();
+ Statement stmt = connectionManager.executeStmt(connection, nodeReferenceSelectSQL,
getKey(targetId.getTargetId().getUUID()));
rs = stmt.getResultSet();
@@ -1335,6 +1386,7 @@
throw new ItemStateException(msg, e);
} finally {
closeResultSet(rs);
+ connectionManager.close(connection);
}
}
@@ -1521,31 +1573,51 @@
* {@inheritDoc}
*/
public InputStream get(String blobId) throws Exception {
- Statement stmt = connectionManager.executeStmt(blobSelectSQL, new Object[]{blobId});
- final ResultSet rs = stmt.getResultSet();
- if (!rs.next()) {
- closeResultSet(rs);
- throw new Exception("no such BLOB: " + blobId);
- }
- InputStream in = rs.getBinaryStream(1);
- if (in == null) {
- // some databases treat zero-length values as NULL;
- // return empty InputStream in such a case
- closeResultSet(rs);
- return new ByteArrayInputStream(new byte[0]);
- }
-
- /**
- * return an InputStream wrapper in order to
- * close the ResultSet when the stream is closed
- */
- return new FilterInputStream(in) {
- public void close() throws IOException {
- in.close();
- // now it's safe to close ResultSet
+ Connection connection = null;
+ try {
+ connection = connectionManager.getConnection();
+ Statement stmt = connectionManager.executeStmt(connection, blobSelectSQL, new Object[]{blobId});
+ final ResultSet rs = stmt.getResultSet();
+ if (!rs.next()) {
+ closeResultSet(rs);
+ throw new Exception("no such BLOB: " + blobId);
+ }
+ InputStream in = rs.getBinaryStream(1);
+ if (in == null) {
+ // some databases treat zero-length values as NULL;
+ // return empty InputStream in such a case
closeResultSet(rs);
+ return new ByteArrayInputStream(new byte[0]);
}
- };
+
+ final Connection c = connection;
+
+ // prevent closing the connection in the finally block
+ connection = null;
+
+ /**
+ * return an InputStream wrapper in order to
+ * close the ResultSet when the stream is closed
+ */
+ return new FilterInputStream(in) {
+ private boolean closed = false;
+ public void close() throws IOException {
+ in.close();
+ // now it's safe to close ResultSet
+ closeResultSet(rs);
+ connectionManager.close(c);
+ closed = true;
+ }
+ protected void finalize() throws Throwable {
+ if (!closed) {
+ close();
+ }
+ super.finalize();
+ }
+ };
+ } finally {
+ connectionManager.close(connection);
+ }
}
/**
@@ -1553,15 +1625,22 @@
*/
public synchronized void put(String blobId, InputStream in, long size)
throws Exception {
- Statement stmt = connectionManager.executeStmt(blobSelectExistSQL, new Object[]{blobId});
- ResultSet rs = stmt.getResultSet();
- // a BLOB exists if the result has at least one entry
- boolean exists = rs.next();
- closeResultSet(rs);
+ Connection connection = null;
+ try {
+ connection = connectionManager.getConnection();
+
+ Statement stmt = connectionManager.executeStmt(connection, blobSelectExistSQL, new Object[]{blobId});
+ ResultSet rs = stmt.getResultSet();
+ // a BLOB exists if the result has at least one entry
+ boolean exists = rs.next();
+ closeResultSet(rs);
- String sql = (exists) ? blobUpdateSQL : blobInsertSQL;
- Object[] params = new Object[]{new ConnectionRecoveryManager.StreamWrapper(in, size), blobId};
- connectionManager.executeStmt(sql, params);
+ String sql = (exists) ? blobUpdateSQL : blobInsertSQL;
+ Object[] params = new Object[]{new StreamWrapper(in, size), blobId};
+ connectionManager.executeStmt(connection, sql, params);
+ } finally {
+ connectionManager.close(connection);
+ }
}
/**
@@ -1568,8 +1647,14 @@
* {@inheritDoc}
*/
public synchronized boolean remove(String blobId) throws Exception {
- Statement stmt = connectionManager.executeStmt(blobDeleteSQL, new Object[]{blobId});
- return stmt.getUpdateCount() == 1;
+ Connection connection = null;
+ try {
+ connection = connectionManager.getConnection();
+ Statement stmt = connectionManager.executeStmt(connection, blobDeleteSQL, new Object[]{blobId});
+ return stmt.getUpdateCount() == 1;
+ } finally {
+ connectionManager.close(connection);
+ }
}
public void close() {
Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/BundleFsPersistenceManager.java
===================================================================
--- src/main/java/org/apache/jackrabbit/core/persistence/bundle/BundleFsPersistenceManager.java (revision 676201)
+++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/BundleFsPersistenceManager.java (working copy)
@@ -21,6 +21,7 @@
import org.apache.commons.io.IOUtils;
import org.apache.jackrabbit.core.fs.FileSystem;
import org.apache.jackrabbit.core.fs.BasedFileSystem;
+import org.apache.jackrabbit.core.fs.FileSystemContext;
import org.apache.jackrabbit.core.fs.FileSystemException;
import org.apache.jackrabbit.core.fs.local.LocalFileSystem;
import org.apache.jackrabbit.core.persistence.PMContext;
@@ -193,7 +194,7 @@
if (useLocalFsBlobStore()) {
LocalFileSystem blobFS = new LocalFileSystem();
blobFS.setRoot(new File(context.getHomeDir(), "blobs"));
- blobFS.init();
+ blobFS.init(new FileSystemContext(context.getConnectionProvider()));
blobStore = new BundleFsPersistenceManager.FSBlobStore(blobFS);
} else {
blobStore = new BundleFsPersistenceManager.FSBlobStore(itemFs);
Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/DerbyPersistenceManager.java
===================================================================
--- src/main/java/org/apache/jackrabbit/core/persistence/bundle/DerbyPersistenceManager.java (revision 676201)
+++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/DerbyPersistenceManager.java (working copy)
@@ -20,6 +20,7 @@
import org.slf4j.LoggerFactory;
import org.apache.jackrabbit.core.persistence.PMContext;
+import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
@@ -260,8 +261,11 @@
protected void checkSchema() throws SQLException, RepositoryException {
// set properties
if (DERBY_EMBEDDED_DRIVER.equals(getDriver())) {
- Statement stmt = connectionManager.getConnection().createStatement();
+ Connection connection = null;
+ Statement stmt = null;
try {
+ connection = connectionManager.getConnection();
+ stmt = connection.createStatement();
stmt.execute("CALL SYSCS_UTIL.SYSCS_SET_DATABASE_PROPERTY "
+ "('derby.storage.initialPages', '" + derbyStorageInitialPages + "')");
stmt.execute("CALL SYSCS_UTIL.SYSCS_SET_DATABASE_PROPERTY "
@@ -274,7 +278,8 @@
+ "('derby.storage.pageSize', '" + derbyStoragePageSize + "')");
} finally {
- stmt.close();
+ connectionManager.close(connection, stmt);
+ connectionManager.close(connection);
}
}
super.checkSchema();
@@ -305,20 +310,26 @@
return;
}
- // prepare connection url for issuing shutdown command
- String url = connectionManager.getConnection().getMetaData().getURL();
- int pos = url.lastIndexOf(';');
- if (pos != -1) {
- // strip any attributes from connection url
- url = url.substring(0, pos);
- }
- url += ";shutdown=true";
+ Connection connection = null;
+ try {
+ connection = connectionManager.getConnection();
+ // prepare connection url for issuing shutdown command
+ String url = connection.getMetaData().getURL();
+ int pos = url.lastIndexOf(';');
+ if (pos != -1) {
+ // strip any attributes from connection url
+ url = url.substring(0, pos);
+ }
+ url += ";shutdown=true";
- // we have to reset the connection to 'autoCommit=true' before closing it;
- // otherwise Derby would mysteriously complain about some pending uncommitted
- // changes which can't possibly be true.
- // @todo further investigate
- connectionManager.getConnection().setAutoCommit(true);
+ // we have to reset the connection to 'autoCommit=true' before closing it;
+ // otherwise Derby would mysteriously complain about some pending uncommitted
+ // changes which can't possibly be true.
+ // @todo further investigate
+ connection.setAutoCommit(true);
+ } finally {
+ connectionManager.close(connection);
+ }
super.close();
Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/H2PersistenceManager.java
===================================================================
--- src/main/java/org/apache/jackrabbit/core/persistence/bundle/H2PersistenceManager.java (revision 676201)
+++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/H2PersistenceManager.java (working copy)
@@ -18,6 +18,7 @@
import org.apache.jackrabbit.core.persistence.PMContext;
+import java.sql.Connection;
import java.sql.Statement;
import java.sql.SQLException;
@@ -95,11 +96,15 @@
* {@inheritDoc}
*/
protected void checkSchema() throws SQLException, RepositoryException {
- Statement stmt = connectionManager.getConnection().createStatement();
+ Connection connection = null;
+ Statement stmt = null;
try {
+ connection = connectionManager.getConnection();
+ stmt = connection.createStatement();
stmt.execute("SET LOCK_TIMEOUT " + lockTimeout);
} finally {
- stmt.close();
+ connectionManager.close(connection, stmt);
+ connectionManager.close(connection);
}
super.checkSchema();
}
Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/Oracle9PersistenceManager.java
===================================================================
--- src/main/java/org/apache/jackrabbit/core/persistence/bundle/Oracle9PersistenceManager.java (revision 676201)
+++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/Oracle9PersistenceManager.java (working copy)
@@ -88,11 +88,17 @@
// use the Connection object for using the exact same
// class loader that the Oracle driver was loaded with
- blobClass = connectionManager.getConnection().getClass().getClassLoader().loadClass("oracle.sql.BLOB");
- duractionSessionConstant =
- new Integer(blobClass.getField("DURATION_SESSION").getInt(null));
- modeReadWriteConstant =
- new Integer(blobClass.getField("MODE_READWRITE").getInt(null));
+ Connection connection = null;
+ try {
+ connection = connectionManager.getConnection();
+ blobClass = connection.getClass().getClassLoader().loadClass("oracle.sql.BLOB");
+ duractionSessionConstant =
+ new Integer(blobClass.getField("DURATION_SESSION").getInt(null));
+ modeReadWriteConstant =
+ new Integer(blobClass.getField("MODE_READWRITE").getInt(null));
+ } finally {
+ connectionManager.close(connection);
+ }
}
/**
@@ -108,6 +114,7 @@
protected synchronized void storeBundle(NodePropBundle bundle)
throws ItemStateException {
Blob blob = null;
+ Connection connection = null;
try {
ByteArrayOutputStream out = new ByteArrayOutputStream(INITIAL_BUFFER_SIZE);
DataOutputStream dout = new DataOutputStream(out);
@@ -117,7 +124,8 @@
String sql = bundle.isNew() ? bundleInsertSQL : bundleUpdateSQL;
blob = createTemporaryBlob(new ByteArrayInputStream(out.toByteArray()));
Object[] params = createParams(bundle.getId().getUUID(), blob, true);
- connectionManager.executeStmt(sql, params);
+ connection = connectionManager.getConnection();
+ connectionManager.executeStmt(connection, sql, params);
} catch (Exception e) {
String msg = "failed to write bundle: " + bundle.getId();
log.error(msg, e);
@@ -129,6 +137,7 @@
} catch (Exception e1) {
}
}
+ connectionManager.close(connection);
}
}
@@ -142,6 +151,7 @@
}
Blob blob = null;
+ Connection connection = null;
try {
// check if insert or update
boolean update = exists(refs.getId());
@@ -156,7 +166,8 @@
blob = createTemporaryBlob(new ByteArrayInputStream(out.toByteArray()));
Object[] params = createParams(refs.getTargetId().getUUID(), blob, true);
- connectionManager.executeStmt(sql, params);
+ connection = connectionManager.getConnection();
+ connectionManager.executeStmt(connection, sql, params);
// there's no need to close a ByteArrayOutputStream
//out.close();
@@ -171,6 +182,7 @@
} catch (Exception e1) {
}
}
+ connectionManager.close(connection);
}
}
@@ -190,26 +202,33 @@
blob.close();
return blob;
*/
- Method createTemporary = blobClass.getMethod("createTemporary",
- new Class[]{Connection.class, Boolean.TYPE, Integer.TYPE});
- Object blob = createTemporary.invoke(null,
- new Object[]{connectionManager.getConnection(), Boolean.FALSE, duractionSessionConstant});
- Method open = blobClass.getMethod("open", new Class[]{Integer.TYPE});
- open.invoke(blob, new Object[]{modeReadWriteConstant});
- Method getBinaryOutputStream = blobClass.getMethod("getBinaryOutputStream", new Class[0]);
- OutputStream out = (OutputStream) getBinaryOutputStream.invoke(blob, null);
+
+ Connection connection = null;
try {
- IOUtils.copy(in, out);
- } finally {
+ connection = connectionManager.getConnection();
+ Method createTemporary = blobClass.getMethod("createTemporary",
+ new Class[]{Connection.class, Boolean.TYPE, Integer.TYPE});
+ Object blob = createTemporary.invoke(null,
+ new Object[]{connection, Boolean.FALSE, duractionSessionConstant});
+ Method open = blobClass.getMethod("open", new Class[]{Integer.TYPE});
+ open.invoke(blob, new Object[]{modeReadWriteConstant});
+ Method getBinaryOutputStream = blobClass.getMethod("getBinaryOutputStream", new Class[0]);
+ OutputStream out = (OutputStream) getBinaryOutputStream.invoke(blob, null);
try {
- out.flush();
- } catch (IOException ioe) {
+ IOUtils.copy(in, out);
+ } finally {
+ try {
+ out.flush();
+ } catch (IOException ioe) {
+ }
+ out.close();
}
- out.close();
+ Method close = blobClass.getMethod("close", new Class[0]);
+ close.invoke(blob, null);
+ return (Blob) blob;
+ } finally {
+ connectionManager.close(connection);
}
- Method close = blobClass.getMethod("close", new Class[0]);
- close.invoke(blob, null);
- return (Blob) blob;
}
/**
@@ -234,8 +253,11 @@
throws Exception {
Blob blob = null;
+ Connection connection = null;
+ Statement stmt = null;
try {
- Statement stmt = connectionManager.executeStmt(blobSelectExistSQL, new Object[]{blobId});
+ connection = connectionManager.getConnection();
+ stmt = connectionManager.executeStmt(connection, blobSelectExistSQL, new Object[]{blobId});
ResultSet rs = stmt.getResultSet();
// a BLOB exists if the result has at least one entry
boolean exists = rs.next();
@@ -243,7 +265,7 @@
String sql = (exists) ? blobUpdateSQL : blobInsertSQL;
blob = createTemporaryBlob(in);
- connectionManager.executeStmt(sql, new Object[]{blob, blobId});
+ connectionManager.executeStmt(connection, sql, new Object[]{blob, blobId});
} finally {
if (blob != null) {
try {
@@ -251,6 +273,8 @@
} catch (Exception e) {
}
}
+ connectionManager.close(connection, stmt);
+ connectionManager.close(connection);
}
}
}
Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/OraclePersistenceManager.java
===================================================================
--- src/main/java/org/apache/jackrabbit/core/persistence/bundle/OraclePersistenceManager.java (revision 676201)
+++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/OraclePersistenceManager.java (working copy)
@@ -16,6 +16,7 @@
*/
package org.apache.jackrabbit.core.persistence.bundle;
+import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.SQLException;
@@ -104,8 +105,10 @@
super.init(context);
// check driver version
+ Connection connection = null;
try {
- DatabaseMetaData metaData = connectionManager.getConnection().getMetaData();
+ connection = connectionManager.getConnection();
+ DatabaseMetaData metaData = connection.getMetaData();
if (metaData.getDriverMajorVersion() < 10) {
// Oracle drivers prior to version 10 only support
// writing BLOBs up to 32k in size...
@@ -115,6 +118,8 @@
}
} catch (SQLException e) {
log.warn("Can not retrieve driver version", e);
+ } finally {
+ connectionManager.close(connection);
}
}
@@ -159,20 +164,26 @@
* @inheritDoc
*/
protected void prepareSchemaObjectPrefix() throws Exception {
- DatabaseMetaData metaData = connectionManager.getConnection().getMetaData();
- String legalChars = metaData.getExtraNameCharacters();
- legalChars += "ABCDEFGHIJKLMNOPQRSTUVWXZY0123456789_";
+ Connection connection = null;
+ try {
+ connection = connectionManager.getConnection();
+ DatabaseMetaData metaData = connection.getMetaData();
+ String legalChars = metaData.getExtraNameCharacters();
+ legalChars += "ABCDEFGHIJKLMNOPQRSTUVWXZY0123456789_";
- String prefix = schemaObjectPrefix.toUpperCase();
- StringBuffer escaped = new StringBuffer();
- for (int i = 0; i < prefix.length(); i++) {
- char c = prefix.charAt(i);
- if (legalChars.indexOf(c) == -1) {
- escaped.append('_');
- } else {
- escaped.append(c);
+ String prefix = schemaObjectPrefix.toUpperCase();
+ StringBuffer escaped = new StringBuffer();
+ for (int i = 0; i < prefix.length(); i++) {
+ char c = prefix.charAt(i);
+ if (legalChars.indexOf(c) == -1) {
+ escaped.append('_');
+ } else {
+ escaped.append(c);
+ }
}
+ schemaObjectPrefix = escaped.toString();
+ } finally {
+ connectionManager.close(connection);
}
- schemaObjectPrefix = escaped.toString();
}
}
Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/PostgreSQLPersistenceManager.java
===================================================================
--- src/main/java/org/apache/jackrabbit/core/persistence/bundle/PostgreSQLPersistenceManager.java (revision 676201)
+++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/PostgreSQLPersistenceManager.java (working copy)
@@ -28,6 +28,7 @@
import java.io.DataInputStream;
import java.io.InputStream;
+import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
@@ -90,8 +91,11 @@
protected synchronized NodePropBundle loadBundle(NodeId id)
throws ItemStateException {
+ Connection connection = null;
+ Statement stmt = null;
try {
- Statement stmt = connectionManager.executeStmt(bundleSelectSQL, getKey(id.getUUID()));
+ connection = connectionManager.getConnection();
+ stmt = connectionManager.executeStmt(connection, bundleSelectSQL, getKey(id.getUUID()));
ResultSet rs = stmt.getResultSet();
try {
if (rs.next()) {
@@ -115,6 +119,9 @@
String msg = "failed to read bundle: " + id + ": " + e;
log.error(msg);
throw new ItemStateException(msg, e);
+ } finally {
+ connectionManager.close(connection, stmt);
+ connectionManager.close(connection);
}
}
Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/ConnectionRecoveryManager.java
===================================================================
--- src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/ConnectionRecoveryManager.java (revision 681176)
+++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/ConnectionRecoveryManager.java (working copy)
@@ -462,8 +462,8 @@
*/
public static class StreamWrapper {
- private final InputStream stream;
- private final long size;
+ final InputStream stream;
+ final long size;
/**
* Creates a wrapper for the given InputStream that can
Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/DbNameIndex.java
===================================================================
--- src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/DbNameIndex.java (revision 681176)
+++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/DbNameIndex.java (working copy)
@@ -16,11 +16,11 @@
*/
package org.apache.jackrabbit.core.persistence.bundle.util;
-import java.util.HashMap;
-
+import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.HashMap;
import org.apache.jackrabbit.core.util.StringIndex;
@@ -40,7 +40,7 @@
/**
* The class that manages statement execution and recovery from connection loss.
*/
- protected ConnectionRecoveryManager connectionManager;
+ protected ConnectionManager connectionManager;
// name index statements
protected String nameSelectSQL;
@@ -57,7 +57,7 @@
* @param schemaObjectPrefix the prefix for table names
* @throws SQLException if the statements cannot be prepared.
*/
- public DbNameIndex(ConnectionRecoveryManager conMgr, String schemaObjectPrefix)
+ public DbNameIndex(ConnectionManager conMgr, String schemaObjectPrefix)
throws SQLException {
connectionManager = conMgr;
init(schemaObjectPrefix);
@@ -132,29 +132,24 @@
*/
protected int insertString(String string) {
// assert index does not exist
- int result = -1;
+ ResultSet rs = null;
+ Connection connection = null;
try {
- Statement stmt = connectionManager.executeStmt(
- nameInsertSQL, new Object[] { string }, true, 0);
- ResultSet rs = stmt.getGeneratedKeys();
- try {
- if (rs.next()) {
- result = rs.getInt(1);
- }
- } finally {
- rs.close();
+ connection = connectionManager.getConnection();
+ Statement stmt = connectionManager.executeStmt(connection, nameInsertSQL, new Object[]{string}, true, 0);
+ rs = stmt.getGeneratedKeys();
+ if (!rs.next()) {
+ return -1;
+ } else {
+ return rs.getInt(1);
}
} catch (Exception e) {
- IllegalStateException ise = new IllegalStateException(
- "Unable to insert index for string: " + string);
+ IllegalStateException ise = new IllegalStateException("Unable to insert index for string: " + string);
ise.initCause(e);
throw ise;
- }
- if (result != -1) {
- return result;
- } else {
- // Could not get the index with getGeneratedKeys, try with SELECT
- return getIndex(string);
+ } finally {
+ connectionManager.close(connection);
+ closeResultSet(rs);
}
}
@@ -164,24 +159,24 @@
* @return the index or -1 if not found.
*/
protected int getIndex(String string) {
+ ResultSet rs = null;
+ Connection connection = null;
try {
- Statement stmt = connectionManager.executeStmt(
- indexSelectSQL, new Object[] { string });
- ResultSet rs = stmt.getResultSet();
- try {
- if (rs.next()) {
- return rs.getInt(1);
- } else {
- return -1;
- }
- } finally {
- rs.close();
+ connection = connectionManager.getConnection();
+ Statement stmt = connectionManager.executeStmt(connection, indexSelectSQL, new Object[]{string});
+ rs = stmt.getResultSet();
+ if (!rs.next()) {
+ return -1;
+ } else {
+ return rs.getInt(1);
}
} catch (Exception e) {
- IllegalStateException ise = new IllegalStateException(
- "Unable to read index for string: " + string);
+ IllegalStateException ise = new IllegalStateException("Unable to read index for string: " + string);
ise.initCause(e);
throw ise;
+ } finally {
+ connectionManager.close(connection);
+ closeResultSet(rs);
}
}
@@ -188,33 +183,41 @@
/**
* Retrieves the string from the database for the given index.
* @param index the index to retrieve the string for.
- * @return the string
- * @throws IllegalArgumentException if the string is not found
+ * @return the string or null if not found.
*/
- protected String getString(int index)
- throws IllegalArgumentException, IllegalStateException {
- String result = null;
+ protected String getString(int index) {
+ ResultSet rs = null;
+ Connection connection = null;
try {
- Statement stmt = connectionManager.executeStmt(
- nameSelectSQL, new Object[] { new Integer(index) });
- ResultSet rs = stmt.getResultSet();
- try {
- if (rs.next()) {
- result = rs.getString(1);
- }
- } finally {
- rs.close();
+ connection = connectionManager.getConnection();
+ Statement stmt = connectionManager.executeStmt(connection, nameSelectSQL, new Object[]{new Integer(index)});
+ rs = stmt.getResultSet();
+ if (!rs.next()) {
+ return null;
+ } else {
+ return rs.getString(1);
}
} catch (Exception e) {
- IllegalStateException ise = new IllegalStateException(
- "Unable to read name for index: " + index);
+ IllegalStateException ise = new IllegalStateException("Unable to read name for index: " + index);
ise.initCause(e);
throw ise;
+ } finally {
+ connectionManager.close(connection);
+ closeResultSet(rs);
}
- if (result == null) {
- throw new IllegalArgumentException("Index not found: " + index);
+ }
+
+ /**
+ * Closes the result set
+ * @param rs the result set.
+ */
+ protected void closeResultSet(ResultSet rs) {
+ if (rs != null) {
+ try {
+ rs.close();
+ } catch (SQLException se) {
+ // ignore
+ }
}
- return result;
}
-
}
Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/NGKDbNameIndex.java
===================================================================
--- src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/NGKDbNameIndex.java (revision 681177)
+++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/NGKDbNameIndex.java (working copy)
@@ -16,6 +16,8 @@
*/
package org.apache.jackrabbit.core.persistence.bundle.util;
+import java.sql.Connection;
+import java.sql.ResultSet;
import java.sql.SQLException;
/**
@@ -31,7 +33,7 @@
* @param schemaObjectPrefix the prefix for table names
* @throws SQLException if the statements cannot be prepared.
*/
- public NGKDbNameIndex(ConnectionRecoveryManager conMgr, String schemaObjectPrefix)
+ public NGKDbNameIndex(ConnectionManager conMgr, String schemaObjectPrefix)
throws SQLException {
super(conMgr, schemaObjectPrefix);
}
@@ -57,15 +59,19 @@
*/
protected int insertString(String string) {
// assert index does not exist
+ ResultSet rs = null;
+ Connection connection = null;
try {
- connectionManager.executeStmt(nameInsertSQL, new Object[] { string });
+ connection = connectionManager.getConnection();
+ connectionManager.executeStmt(connection, nameInsertSQL, new Object[]{string});
+ return getIndex(string);
} catch (Exception e) {
- IllegalStateException ise = new IllegalStateException(
- "Unable to insert index for string: " + string);
+ IllegalStateException ise = new IllegalStateException("Unable to insert index for string: " + string);
ise.initCause(e);
throw ise;
+ } finally {
+ connectionManager.close(connection);
+ closeResultSet(rs);
}
-
- return getIndex(string);
}
}
Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/NodePropBundle.java
===================================================================
--- src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/NodePropBundle.java (revision 683292)
+++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/NodePropBundle.java (working copy)
@@ -109,7 +109,7 @@
* the size
*/
private long size = 0;
-
+
/**
* Shared set, consisting of the parent ids of this shareable node. This
* entry is null if this node is not shareable.
@@ -433,7 +433,7 @@
pe.destroy(binding.getBlobStore());
}
}
-
+
/**
* Sets the shared set of this bundle.
* @return the shared set of this bundle.
Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/PostgreSQLNameIndex.java
===================================================================
--- src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/PostgreSQLNameIndex.java (revision 681176)
+++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/PostgreSQLNameIndex.java (working copy)
@@ -16,6 +16,7 @@
*/
package org.apache.jackrabbit.core.persistence.bundle.util;
+import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
@@ -28,7 +29,7 @@
protected String generatedKeySelectSQL;
- public PostgreSQLNameIndex(ConnectionRecoveryManager conMgr, String schemaObjectPrefix)
+ public PostgreSQLNameIndex(ConnectionManager conMgr, String schemaObjectPrefix)
throws SQLException {
super(conMgr, schemaObjectPrefix);
}
@@ -59,13 +60,17 @@
*/
protected int insertString(String string) {
// assert index does not exist
+ Connection connection = null;
try {
- connectionManager.executeStmt(nameInsertSQL, new Object[]{string});
- return getGeneratedKey();
+ connection = connectionManager.getConnection();
+ connectionManager.executeStmt(connection, nameInsertSQL, new Object[]{string});
+ return getGeneratedKey(connection);
} catch (Exception e) {
IllegalStateException ise = new IllegalStateException("Unable to insert index for string: " + string);
ise.initCause(e);
- throw ise ;
+ throw ise;
+ } finally {
+ connectionManager.close(connection);
}
}
@@ -73,9 +78,9 @@
* Retrieves the last assigned key from the database.
* @return the index.
*/
- protected int getGeneratedKey() {
+ protected int getGeneratedKey(Connection connection) {
try {
- ResultSet rs = connectionManager.executeQuery(generatedKeySelectSQL);
+ ResultSet rs = connectionManager.executeQuery(connection, generatedKeySelectSQL);
try {
if (!rs.next()) {
return -1;
Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/ConnectionManager.java
===================================================================
--- src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/ConnectionManager.java (revision 0)
+++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/ConnectionManager.java (revision 0)
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.core.persistence.bundle.util;
+
+import java.io.InputStream;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Map;
+
+import javax.jcr.RepositoryException;
+
+import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionProvider.ConnectionProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
+
+/**
+ * This class provides methods to get a database connection and to execute SQL statements.
+ */
+public class ConnectionManager {
+
+ private final ConnectionProvider connectionProvider;
+ private final ConnectionProperties connectionProperties;
+
+ /**
+ * Creates a new {@link ConnectionManager} instance
+ *
+ * @param connectionProvider
+ * @param connectionProperties
+ */
+ public ConnectionManager(ConnectionProvider connectionProvider, ConnectionProperties connectionProperties,
+ boolean threadBound) {
+ if (!threadBound) {
+ this.connectionProvider = connectionProvider;
+ } else {
+ this.connectionProvider = new ThreadLocalConnectionProviderAdapter(connectionProvider);
+ }
+ this.connectionProperties = connectionProperties;
+ }
+
+ /**
+ * Returns a database {@link Connection}
+ * @return
+ * @throws SQLException
+ * @throws RepositoryException
+ */
+ public Connection getConnection() throws SQLException, RepositoryException {
+ Connection connection = connectionProvider.getConnection(connectionProperties);
+
+ ConnectionEntry e = (ConnectionEntry) connectionEntries.get(connection);
+ if (e != null) {
+ ++e.usageCount;
+ } else {
+ e = new ConnectionEntry();
+ connectionEntries.put(connection, e);
+ }
+
+ return connection;
+ };
+
+ /**
+ * Closes the database {@link Connection}. This method must always be called to close a connection
+ * (instead of calling {@link Connection#close()}).
+ * @param connection
+ */
+ public void close(Connection connection) {
+ if (connection != null) {
+ ConnectionEntry entry = (ConnectionEntry) connectionEntries.get(connection);
+ try {
+ connectionProvider.closeConnection(connection);
+ } catch (SQLException e) {
+ log.error("Error closing SQL connection", e);
+ }
+ if (entry != null) {
+ --entry.usageCount;
+ if (entry.usageCount <= 0) {
+ connectionEntries.remove(entry);
+ }
+ }
+ }
+ }
+
+ public void close(Connection connection, Statement statement) {
+ if (statement != null) {
+ try {
+ if (connection != null) {
+ ConnectionEntry entry = (ConnectionEntry) connectionEntries.get(connection);
+ if (entry != null) {
+ entry.statements.remove(statement);
+ }
+ }
+ statement.close();
+ } catch (SQLException e) {
+ log.error("Error closing SQL statement", e);
+ }
+ }
+ }
+
+ /**
+ * Entry for each active connection.
+ */
+ private static class ConnectionEntry {
+ private ConnectionEntry() {
+ }
+
+ private Map/*block parameter has been set to true on construction)
+ * if this fails and autoReconnect is enabled.
+ *
+ * @param sql the SQL query to execute
+ * @return the executed ResultSet
+ * @throws SQLException on error
+ * @throws RepositoryException if the database driver could not be loaded
+ */
+ public synchronized ResultSet executeQuery(Connection connection, String sql) throws SQLException, RepositoryException {
+ return executeQueryInternal(connection, sql);
+ }
+
+ /**
+ * Executes the given SQL query.
+ *
+ * @param sql query to execute
+ * @return a ResultSet object
+ * @throws SQLException if an error occurs
+ * @throws RepositoryException if the database driver could not be loaded
+ */
+ private ResultSet executeQueryInternal(Connection connection, String sql) throws SQLException, RepositoryException {
+ PreparedStatement stmt = null;
+ try {
+ ConnectionEntry entry = (ConnectionEntry) connectionEntries.get(connection);
+
+ stmt = (PreparedStatement) entry.statements.get(sql);
+ if (stmt == null) {
+ stmt = connection.prepareStatement(sql);
+ entry.statements.put(sql, stmt);
+ }
+ return stmt.executeQuery();
+ } catch (SQLException e) {
+ logException("could not execute statement", e);
+ throw e;
+ } finally {
+ resetStatement(stmt);
+ }
+ }
+
+ /**
+ * Resets the given PreparedStatement by clearing the
+ * parameters and warnings contained.
+ *
+ * @param stmt The PreparedStatement to reset. If
+ * null this method does nothing.
+ */
+ private void resetStatement(PreparedStatement stmt) {
+ if (stmt != null) {
+ try {
+ stmt.clearParameters();
+ stmt.clearWarnings();
+ } catch (SQLException se) {
+ logException("Failed resetting PreparedStatement", se);
+ }
+ }
+ }
+
+ /**
+ * Executes the given SQL statement with the specified parameters.
+ *
+ * @param sql statement to execute
+ * @param params parameters to set
+ * @return the Statement object that had been executed
+ * @throws SQLException if an error occurs
+ * @throws RepositoryException if the database driver could not be loaded
+ */
+ public PreparedStatement executeStmt(Connection connection, String sql, Object[] params)
+ throws SQLException, RepositoryException {
+ return executeStmt(connection, sql, params, false, 0);
+ }
+
+ /**
+ * Executes the given SQL statement with the specified parameters.
+ *
+ * @param sql statement to execute
+ * @param params parameters to set
+ * @param returnGeneratedKeys if the statement should return auto generated keys
+ * @param maxRows the maximum number of rows to return (0 for all rows)
+ * @return the Statement object that had been executed
+ * @throws SQLException if an error occurs
+ * @throws RepositoryException if the database driver could not be loaded
+ */
+ public synchronized PreparedStatement executeStmt(Connection connection,
+ String sql, Object[] params, boolean returnGeneratedKeys, int maxRows)
+ throws SQLException, RepositoryException {
+ return executeStmtInternal(connection, sql, params, returnGeneratedKeys, maxRows);
+ }
+
+ /**
+ * Executes the given SQL statement with the specified parameters.
+ *
+ * @param sql statement to execute
+ * @param params parameters to set
+ * @param returnGeneratedKeys if the statement should return auto generated keys
+ * @param maxRows the maximum number of rows to return (0 for all rows)
+ * @return the Statement object that had been executed
+ * @throws SQLException if an error occurs
+ * @throws RepositoryException if the database driver could not be loaded
+ */
+ private PreparedStatement executeStmtInternal(Connection connection,
+ String sql, Object[] params, boolean returnGeneratedKeys, int maxRows)
+ throws SQLException, RepositoryException {
+ try {
+ String key = sql;
+ if (returnGeneratedKeys) {
+ key += " RETURN_GENERATED_KEYS";
+ }
+
+ ConnectionEntry entry = (ConnectionEntry) connectionEntries.get(connection);
+
+ PreparedStatement stmt = (PreparedStatement) entry.statements.get(key);
+ if (stmt == null) {
+ if (returnGeneratedKeys) {
+ stmt = connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS);
+ } else {
+ stmt = connection.prepareStatement(sql);
+ }
+ entry.statements.put(key, stmt);
+ }
+ stmt.setMaxRows(maxRows);
+ return executeStmtInternal(connection, params, stmt);
+ } catch (SQLException e) {
+ logException("could not execute statement", e);
+ throw e;
+ }
+ }
+
+ /**
+ * @param params the parameters for the stmt parameter
+ * @param stmt the statement to execute
+ * @return the executed Statement
+ * @throws SQLException on error
+ */
+ private PreparedStatement executeStmtInternal(Connection connection, Object[] params, PreparedStatement stmt)
+ throws SQLException {
+ for (int i = 0; params != null && i < params.length; i++) {
+ Object p = params[i];
+ if (p instanceof StreamWrapper) {
+ StreamWrapper wrapper = (StreamWrapper) p;
+ stmt.setBinaryStream(i + 1, wrapper.getStream(), (int) wrapper.getSize());
+ } else if (p instanceof InputStream) {
+ InputStream stream = (InputStream) p;
+ stmt.setBinaryStream(i + 1, stream, -1);
+ } else {
+ stmt.setObject(i + 1, p);
+ }
+ }
+ stmt.execute();
+ resetStatement(stmt);
+ return stmt;
+ }
+
+ /**
+ * Logs an sql exception.
+ *
+ * @param message the message
+ * @param se the exception
+ */
+ private void logException(String message, SQLException se) {
+ message = message == null ? "" : message;
+ log.error(message + ", reason: " + se.getMessage() + ", state/code: "
+ + se.getSQLState() + "/" + se.getErrorCode());
+ log.debug(" dump:", se);
+ }
+
+ private static final Logger log = LoggerFactory.getLogger(ConnectionManager.class);
+}
Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/ConnectionProvider.java
===================================================================
--- src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/ConnectionProvider.java (revision 0)
+++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/ConnectionProvider.java (revision 0)
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.core.persistence.bundle.util;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+
+import javax.jcr.RepositoryException;
+
+
+/**
+ * Class responsible for providing SQL {@link Connection}s.
+ * + * Implementation of this class can do connection pooling, in which case it must + * take {@link ConnectionProperties} into account and only return connection + * that matches the specified properties. + */ +public interface ConnectionProvider { + + /** + * Returns connection with given properties. + * + * @param properties + * connection properties + * @return SQL {@link Connection} + * + * @throws RepositoryException + * @throws SQLException + */ + Connection getConnection(ConnectionProperties properties) throws RepositoryException, SQLException; + + /** + * Closes the given connection. Classes that obtain connections through + * {@link ConnectionProvider} must never call {@link Connection#close()} + * directly. They are required to call {@link #closeConnection(Connection)} + * instead. + * + * @param connection + * SQL {@link Connection} + * + * @throws SQLException + */ + void closeConnection(Connection connection) throws SQLException; + + /** + * Invoked when the repository is being shut down. + * + * @throws RepositoryException + */ + void dispose() throws RepositoryException; + + /** + * Bean that holds properties necessary to create or identify a SQL + * {@link Connection}. + *
+ * Note that this class properly implements {@link #equals(Object)} and
+ * {@link #hashCode()} methods so it can be used as key in maps.
+ */
+ public static final class ConnectionProperties {
+ private String user;
+ private String password;
+ private String url;
+ private String driver;
+
+ public String getUser() {
+ return user;
+ }
+
+ public void setUser(String user) {
+ this.user = user;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public String getUrl() {
+ return url;
+ }
+
+ public void setUrl(String url) {
+ this.url = url;
+ }
+
+ public void setDriver(String driver) {
+ this.driver = driver;
+ }
+
+ public String getDriver() {
+ return driver;
+ }
+
+ private boolean equals(String s1, String s2) {
+ return s1 == s2 || (s1 != null && s1.equals(s2));
+ }
+
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof ConnectionProperties)) {
+ return false;
+ }
+ ConnectionProperties cp = (ConnectionProperties) obj;
+ return equals(user, cp.user) && equals(password, cp.password)
+ && equals(url, cp.url) && equals(driver, cp.driver);
+
+ }
+
+ private int hashCode(String s) {
+ return s != null ? s.hashCode() : 0;
+ }
+
+ public int hashCode() {
+ return hashCode(user) + 37 * hashCode(password) + 373
+ & hashCode(url) + 1187 * hashCode(driver);
+ }
+ }
+}
Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/SimplePoolingConnectionProvider.java
===================================================================
--- src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/SimplePoolingConnectionProvider.java (revision 0)
+++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/SimplePoolingConnectionProvider.java (revision 0)
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.core.persistence.bundle.util;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import javax.jcr.RepositoryException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple {@link ConnectionProvider} implementation that pools the database
+ * {@link Connection}s. This is a proof of concept implementation and should
+ * not be used for production.
+ */
+public class SimplePoolingConnectionProvider implements ConnectionProvider {
+
+ public SimplePoolingConnectionProvider() {
+
+ }
+
+ public void closeConnection(Connection connection) throws SQLException {
+ if (connection != null) {
+ Pool pool = null;
+ synchronized (this) {
+ pool = (Pool) connectionToPool.remove(connection);
+ }
+ if (pool == null) {
+ log.warn("Trying to close connection not opened by this ConnectionManager");
+ } else {
+ pool.returnConnection(connection);
+ }
+ }
+ }
+
+ public synchronized void dispose() throws RepositoryException {
+ connectionToPool.clear();
+ for (Iterator i = driverToPool.values().iterator(); i.hasNext();) {
+ Pool pool = (Pool) i.next();
+ pool.dispose();
+ }
+ connectionToPool.clear();
+ }
+
+ public Connection getConnection(ConnectionProperties properties)
+ throws RepositoryException, SQLException {
+
+ if (properties == null) {
+ throw new IllegalArgumentException("Argument 'properties' may not be null.");
+ }
+ if (properties.getDriver() == null) {
+ throw new IllegalArgumentException("Database driver may not be null.");
+ }
+
+ Pool pool;
+ synchronized (this) {
+ pool = (Pool) driverToPool.get(properties.getDriver());
+ if (pool == null) {
+ pool = new Pool();
+ driverToPool.put(properties.getDriver(), pool);
+ }
+ }
+ Connection c = pool.getConnection(properties);
+ synchronized (this) {
+ connectionToPool.put(c, pool);
+ }
+
+ if (!c.getAutoCommit()) {
+ c.setAutoCommit(true);
+ }
+
+ return c;
+ }
+
+ private int minConnections = 10;
+ private int maxConnections = 20;
+ private boolean block = true;
+
+ /**
+ * Sets the maximal amount of pooled connections. If there is demand for more connections,
+ * the manager will either block (if {@link #setBlock(boolean)} is true) or throw an exception
+ * (if {@link #setBlock(boolean)} is false).
+ *
+ * @param maxConnections
+ */
+ public void setMaxConnections(int maxConnections) {
+ this.maxConnections = maxConnections;
+ }
+
+ /**
+ * Returns the maximal amount of pooled connections.
+ * @return
+ */
+ public int getMaxConnections() {
+ return maxConnections;
+ }
+
+ /**
+ * Returns the minimal amount of pooled connections.
+ * @return
+ */
+ public int getMinConnections() {
+ return minConnections;
+ }
+
+ /**
+ * Sets the minimal amount of pooled connections. If connection is returned to the pool
+ * and there are more connections in pool than minimal amount of pooled connection,
+ * the last accessed avilable connection will be removed.
+ * @param minConnections
+ */
+ public void setMinConnections(int minConnections) {
+ this.minConnections = minConnections;
+ }
+
+ /**
+ * Sets the blocking behavior when there is no available connection in pool.
+ * If block is true, the requesting thread will be blocked, otherwise
+ * if block is false, and exception will be thrown.
+ * @param block
+ */
+ public void setBlock(boolean block) {
+ this.block = block;
+ }
+
+ public boolean isBlock() {
+ return block;
+ }
+
+ private Map driverToPool = new HashMap();
+ private Map connectionToPool = new HashMap();
+
+ /**
+ * Simple Pool implementation.
+ */
+ private class Pool {
+
+ private Pool() {
+
+ }
+
+ private final List entries = new ArrayList();
+
+ /**
+ * Entry for connection in pool
+ */
+ private class ConnectionEntry {
+
+ private ConnectionEntry(Connection connection,
+ ConnectionProperties connectionProperties, boolean available) {
+ this.connection = connection;
+ this.connectionProperties = connectionProperties;
+ this.available = available;
+ touch();
+ }
+
+ private final Connection connection;
+ private final ConnectionProperties connectionProperties;
+ private boolean available;
+
+ public Connection getConnection() {
+ return connection;
+ }
+
+ public ConnectionProperties getConnectionProperties() {
+ return connectionProperties;
+ }
+
+ public void setAvailable(boolean available) {
+ this.available = available;
+ }
+
+ public boolean isAvailable() {
+ return available;
+ }
+
+ public void touch() {
+ lastAccessed = System.currentTimeMillis();
+ }
+
+ public long getLastAccessed() {
+ return lastAccessed;
+ }
+
+ private long lastAccessed;
+ };
+
+ private Connection tryGetConnection(ConnectionProperties properties) throws RepositoryException, SQLException {
+ synchronized (entries) {
+ Iterator i = entries.iterator();
+ while (i.hasNext()) {
+ ConnectionEntry e = (ConnectionEntry) i.next();
+ if (e.isAvailable()
+ && e.getConnectionProperties().equals(properties)) {
+ e.setAvailable(false);
+ e.touch();
+ return e.getConnection();
+ }
+ }
+
+ if (entries.size() >= maxConnections) {
+ // try remove at least one connection
+ removeOldestAvailableEntry();
+ }
+
+ if (entries.size() >= maxConnections) {
+ if (!isBlock()) {
+ throw new RepositoryException(
+ "Couldn't get any more database connections.");
+ } else {
+ return null;
+ }
+ }
+
+ Connection connection = ConnectionFactory.getConnection(
+ properties.getDriver(), properties.getUrl(), properties
+ .getUser(), properties.getPassword());
+ ConnectionEntry entry = new ConnectionEntry(connection, properties, false);
+ entries.add(entry);
+ return connection;
+ }
+ }
+
+ private Connection getConnection(ConnectionProperties properties) throws RepositoryException, SQLException {
+ Connection connection = null;
+ while (connection == null) {
+ connection = tryGetConnection(properties);
+ if (connection == null) {
+ try {
+ final int sleep = 500;
+ log.info("No available connections in pool, waiting " + sleep + " ms");
+ Thread.sleep(sleep);
+ } catch (InterruptedException ignore) {
+
+ }
+ }
+ }
+ return connection;
+ }
+
+ private void removeOldestAvailableEntry() {
+ ConnectionEntry oldest = null;
+
+ for (Iterator i = entries.iterator(); i.hasNext();) {
+ ConnectionEntry entry = (ConnectionEntry) i.next();
+ if (entry.isAvailable() && (oldest == null || oldest.getLastAccessed() > entry.getLastAccessed())) {
+ oldest = entry;
+ }
+ }
+
+ if (oldest != null) {
+ entries.remove(oldest);
+ try {
+ oldest.getConnection().close();
+ } catch (SQLException e) {
+ log.error("Error closing connection", e);
+ }
+ }
+ }
+
+ private void returnConnection(Connection connection) {
+ try {
+ if (connection.isClosed()) {
+ log.error("Connection is closed!");
+ }
+ connection.rollback();
+ connection.setAutoCommit(true);
+ } catch (SQLException e) {
+ log.error("Error rollbacking connection", e);
+ } finally {
+ synchronized (entries) {
+ for (Iterator i = entries.iterator(); i.hasNext();) {
+ ConnectionEntry entry = (ConnectionEntry) i.next();
+ if (entry.connection == connection) {
+ entry.setAvailable(true);
+ if (entries.size() > getMinConnections()) {
+ removeOldestAvailableEntry();
+ }
+ return;
+ }
+ }
+ }
+ }
+ }
+
+ private void dispose() {
+ for (Iterator i = entries.iterator(); i.hasNext();) {
+ ConnectionEntry entry = (ConnectionEntry) i.next();
+ try {
+ entry.getConnection().close();
+ } catch (SQLException e) {
+ log.error("Error closing connection ", e);
+ }
+ }
+ }
+ };
+
+ private static final Logger log = LoggerFactory
+ .getLogger(SimplePoolingConnectionProvider.class);
+}
Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/StreamWrapper.java
===================================================================
--- src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/StreamWrapper.java (revision 0)
+++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/StreamWrapper.java (revision 0)
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.core.persistence.bundle.util;
+
+import java.io.InputStream;
+
+public class StreamWrapper {
+
+ private final InputStream stream;
+ private final long size;
+
+ /**
+ * Creates a wrapper for the given InputStream that can
+ * safely be passed as a parameter to the executeStmt
+ * methods in the {@link ConnectionRecoveryManager} class.
+ *
+ * @param in the InputStream to wrap
+ * @param size the size of the input stream
+ */
+ public StreamWrapper(InputStream in, long size) {
+ this.stream = in;
+ this.size = size;
+ }
+
+ public InputStream getStream() {
+ return stream;
+ }
+
+ public long getSize() {
+ return size;
+ }
+}
\ No newline at end of file
Index: src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/ThreadLocalConnectionProviderAdapter.java
===================================================================
--- src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/ThreadLocalConnectionProviderAdapter.java (revision 0)
+++ src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/ThreadLocalConnectionProviderAdapter.java (revision 0)
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.core.persistence.bundle.util;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jcr.RepositoryException;
+
+import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Adapter for {@link ConnectionProvider} that binds connection to current thread.
+ * First call to {@link #getConnection(ConnectionProperties)} binds the connection to current thread.
+ * All subsequent calls to {@link #getConnection(ConnectionProperties)} return the same connection
+ * instance, until {@link #closeConnection(Connection)} is called appropriate number of times.
+ */
+public class ThreadLocalConnectionProviderAdapter implements ConnectionProvider {
+
+ private final ConnectionProvider delegate;
+
+ public ThreadLocalConnectionProviderAdapter(ConnectionProvider delegate) {
+ this.delegate = delegate;
+ }
+
+ public void closeConnection(Connection connection) throws SQLException {
+
+ ConnectionEntry e = (ConnectionEntry) connectionToEntry.get(connection);
+ if (e == null) {
+ throw new IllegalStateException(
+ "Trying to close unknown connection.");
+ }
+
+ if (e.usageCount == 0) {
+ throw new IllegalStateException(
+ "Trying to close already closed connection.");
+ }
+
+ --e.usageCount;
+ if (e.usageCount == 0) {
+
+ // try to remove the connection from map in current thread
+ Map/*