### Eclipse Workspace Patch 1.0 #P jackrabbit-core Index: src/main/java/org/apache/jackrabbit/core/util/db/StreamWrapper.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/util/db/StreamWrapper.java (revision 1241895) +++ src/main/java/org/apache/jackrabbit/core/util/db/StreamWrapper.java (working copy) @@ -16,11 +16,20 @@ */ package org.apache.jackrabbit.core.util.db; +import java.io.BufferedInputStream; +import java.io.IOException; import java.io.InputStream; +import java.sql.SQLException; +import org.apache.jackrabbit.core.data.db.TempFileInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class StreamWrapper { - private final InputStream stream; + static Logger log = LoggerFactory.getLogger(StreamWrapper.class); + + private InputStream stream; private final long size; /** @@ -28,7 +37,8 @@ * safely be passed as a parameter to the {@link ConnectionHelper#exec(String, Object...)}, * {@link ConnectionHelper#exec(String, Object[], boolean, int)} and * {@link ConnectionHelper#update(String, Object[])} methods. - * + * If the wrapped Stream is a {@link TempFileInputStream} it will be wrapped again by a {@link BufferedInputStream}. + * * @param in the InputStream to wrap * @param size the size of the input stream */ @@ -38,10 +48,46 @@ } public InputStream getStream() { + if (stream instanceof TempFileInputStream) { + return new BufferedInputStream(stream); + } return stream; } public long getSize() { return size; } + + public void cleanupResources() { + if (stream instanceof TempFileInputStream) { + try { + stream.close(); + ((TempFileInputStream) stream).deleteFile(); + } catch (IOException e) { + log.warn("Unable to cleanup the TempFileInputStream"); + } + } + } + + /** + * Resets the internal InputStream that it could be re-read.
+ * Is used from {@link RetryManager} if a {@link SQLException} has occurred.
+ * At the moment only a {@link TempFileInputStream} can be reseted. + * + * @return returns true if it was able to reset the Stream + */ + public boolean resetStream() { + if (stream instanceof TempFileInputStream) { + try { + TempFileInputStream tempFileInputStream = (TempFileInputStream) stream; + // Close it if it is not already closed ... + tempFileInputStream.close(); + stream = new TempFileInputStream(tempFileInputStream.getFile()); + return true; + } catch (Exception e) { + log.warn("Failed to create a new TempFileInputStream", e); + } + } + return false; + } } Index: src/main/java/org/apache/jackrabbit/core/data/db/DbDataStore.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/data/db/DbDataStore.java (revision 1241895) +++ src/main/java/org/apache/jackrabbit/core/data/db/DbDataStore.java (working copy) @@ -346,9 +346,8 @@ wrapper = new StreamWrapper(in, Integer.MAX_VALUE); } else if (STORE_TEMP_FILE.equals(storeStream)) { File temp = moveToTempFile(in); - fileInput = new BufferedInputStream(new TempFileInputStream(temp)); long length = temp.length(); - wrapper = new StreamWrapper(fileInput, length); + wrapper = new StreamWrapper(new TempFileInputStream(temp), length); } else { throw new DataStoreException("Unsupported stream store algorithm: " + storeStream); } Index: src/main/java/org/apache/jackrabbit/core/util/db/DbUtility.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/util/db/DbUtility.java (revision 1291582) +++ src/main/java/org/apache/jackrabbit/core/util/db/DbUtility.java (working copy) @@ -71,7 +71,7 @@ logException("failed to close Statement", e); } finally { try { - if (con != null) { + if (con != null && !con.isClosed()) { con.close(); } } catch (SQLException e) { Index: src/main/java/org/apache/jackrabbit/core/util/db/ConnectionHelper.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/util/db/ConnectionHelper.java (revision 1241895) +++ src/main/java/org/apache/jackrabbit/core/util/db/ConnectionHelper.java (working copy) @@ -278,7 +278,7 @@ * @throws SQLException on error */ public final void exec(final String sql, final Object... params) throws SQLException { - new RetryManager() { + new RetryManager(params) { @Override protected Void call() throws SQLException { @@ -316,7 +316,7 @@ * @throws SQLException on error */ public final int update(final String sql, final Object... params) throws SQLException { - return new RetryManager() { + return new RetryManager(params) { @Override protected Integer call() throws SQLException { @@ -363,11 +363,11 @@ */ public final ResultSet exec(final String sql, final Object[] params, final boolean returnGeneratedKeys, final int maxRows) throws SQLException { - return new RetryManager() { + return new RetryManager(params) { @Override protected ResultSet call() throws SQLException { - return reallyExec(sql, params, returnGeneratedKeys, maxRows); + return reallyExec(sql, params, returnGeneratedKeys, maxRows); } }.doTry(); @@ -462,7 +462,6 @@ protected PreparedStatement execute(PreparedStatement stmt, Object[] params) throws SQLException { for (int i = 0; params != null && i < params.length; i++) { Object p = params[i]; - // FIXME: what about already consumed input streams when in a retry? if (p instanceof StreamWrapper) { StreamWrapper wrapper = (StreamWrapper) p; stmt.setBinaryStream(i + 1, wrapper.getStream(), (int) wrapper.getSize()); @@ -470,17 +469,39 @@ stmt.setObject(i + 1, p); } } - stmt.execute(); + try { + stmt.execute(); + } catch (SQLException e) { + //Reset Stream for retry ... + for (int i = 0; params != null && i < params.length; i++) { + Object p = params[i]; + if (p instanceof StreamWrapper) { + StreamWrapper wrapper = (StreamWrapper) p; + if(!wrapper.resetStream()) { + wrapper.cleanupResources(); + throw new RuntimeException("Unable to reset the Stream."); + } + } + } + throw e; + } return stmt; } /** * This class encapsulates the logic to retry a method invocation if it threw an SQLException. + * The RetryManager must cleanup the Params it will get. * * @param the return type of the method which is retried if it failed */ public abstract class RetryManager { + private Object[] params; + + public RetryManager(Object[] params) { + this.params = params; + } + public final T doTry() throws SQLException { if (inBatchMode()) { return call(); @@ -490,7 +511,9 @@ SQLException lastException = null; while (!sleepInterrupted && (blockOnConnectionLoss || failures <= RETRIES)) { try { - return call(); + T object = call(); + cleanupParamResources(); + return object; } catch (SQLException e) { lastException = e; } @@ -507,10 +530,26 @@ } } } + cleanupParamResources(); throw lastException; } } protected abstract T call() throws SQLException; + + /** + * Cleans up the Parameter resources that are not automatically closed or deleted. + * + * @param params + */ + protected void cleanupParamResources() { + for (int i = 0; params != null && i < params.length; i++) { + Object p = params[i]; + if (p instanceof StreamWrapper) { + StreamWrapper wrapper = (StreamWrapper) p; + wrapper.cleanupResources(); + } + } + } } } Index: src/main/java/org/apache/jackrabbit/core/data/db/TempFileInputStream.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/data/db/TempFileInputStream.java (revision 1241895) +++ src/main/java/org/apache/jackrabbit/core/data/db/TempFileInputStream.java (working copy) @@ -67,7 +67,15 @@ this.file = file; } - private int closeIfEOF(int read) throws IOException { + public File getFile() { + return file; + } + + public void deleteFile() { + file.delete(); + } + + private int closeIfEOF(int read) throws IOException { if (read < 0) { close(); } @@ -77,7 +85,6 @@ public void close() throws IOException { if (!closed) { in.close(); - file.delete(); closed = true; } }