Index: jackrabbit-core/src/main/java/org/apache/jackrabbit/core/data/db/DbInputStream.java =================================================================== --- jackrabbit-core/src/main/java/org/apache/jackrabbit/core/data/db/DbInputStream.java (revision 886191) +++ jackrabbit-core/src/main/java/org/apache/jackrabbit/core/data/db/DbInputStream.java Thu Mar 18 10:35:55 CET 2010 @@ -16,82 +16,127 @@ */ package org.apache.jackrabbit.core.data.db; -import java.io.EOFException; -import java.io.IOException; -import java.sql.ResultSet; - -import org.apache.commons.io.input.AutoCloseInputStream; import org.apache.jackrabbit.core.data.DataIdentifier; import org.apache.jackrabbit.core.data.DataStoreException; import org.apache.jackrabbit.core.util.db.DbUtility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.sql.ResultSet; + /** * This class represents an input stream backed by a database. The database * objects are only acquired when reading from the stream, and stay open until * the stream is closed, fully read, or garbage collected. */ -public class DbInputStream extends AutoCloseInputStream { +public class DbInputStream extends InputStream { private static Logger log = LoggerFactory.getLogger(DbInputStream.class); protected DbDataStore store; protected DataIdentifier identifier; - protected boolean endOfStream; protected ResultSet rs; - + + /** + * All bytes read after a call to mark() will be buffered in case the + * underlying stream is auto-closed by an exhausting read(). + */ + private byte[] mirrorBuffer; /** + * The position of the next byte to be written into the mirrorBuffer. + */ + private int mirrorPos; + + /** + * When mark() is called, mirroring is turned on (i.e. set to true). After + * a call to reset() mirroring is turned off again. + */ + private boolean mirroring = false; + + + /** + * Flag indicating whether we should read from the mirroredStream or from originalStream. + */ + private boolean usingMirror = false; + + + /** + * The original underlying InputStream. + */ + private InputStream originalStream; + + /** + * InputStream which is created when mark() has previously been called and + * the originalStream has been read exhaustively. This guards from an + * underlying stream being auto-closed during an exhaustive read(). + */ + private InputStream mirroredStream; + private boolean closed = false; + + /** * Create a database input stream for the given identifier. * Database access is delayed until the first byte is read from the stream. - * + * - * @param store the database data store + * @param store the database data store * @param identifier the data identifier */ protected DbInputStream(DbDataStore store, DataIdentifier identifier) { - super(null); + super(); this.store = store; this.identifier = identifier; } /** - * Open the stream if required. + * Utility method to get the correct input stream and to centrally handle + * the closed stream case. - * + * - * @throws IOException + * @return The currently active input stream. This depends on the usingMirror + * flag. + * @throws IOException An IOException is thrown if the close() method has + * been called. */ - protected void openStream() throws IOException { - if (endOfStream) { - throw new EOFException(); + private InputStream getInputStream() throws IOException { + if (closed) { + throw new IOException("Stream closed."); } - if (in == null) { - try { - in = store.openStream(this, identifier); - } catch (DataStoreException e) { - IOException e2 = new IOException(e.getMessage()); - e2.initCause(e); - throw e2; + if (usingMirror) { + ensureMirroredStream(); + return mirroredStream; + } else { + ensureOriginalStream(); + return originalStream; - } - } + } + } - } /** * {@inheritDoc} * When the stream is consumed, the database objects held by the instance are closed. */ public int read() throws IOException { - if (endOfStream) { - return -1; + // switch back from mirror to check if we can continue reading from + // the original stream + if (!closed && usingMirror && 0 == getInputStream().available()) { + stopUsingMirror(); } - openStream(); - int c = in.read(); - if (c == -1) { - endOfStream = true; - close(); + + int nextByte = getInputStream().read(); + + // free underlying resources + if (!usingMirror && -1 == nextByte) { + closeOriginalStream(); } - return c; + + // copy byte to the mirrorBuffer + if (-1 < nextByte && mirroring) { + mirrorBuffer[mirrorPos++] = (byte) nextByte; - } + } + return nextByte; + } /** * {@inheritDoc} @@ -101,111 +146,197 @@ return read(b, 0, b.length); } + /** * {@inheritDoc} * When the stream is consumed, the database objects held by the instance are closed. */ public int read(byte[] b, int off, int len) throws IOException { - if (endOfStream) { - return -1; + final int desiredBytes = len - off; + int length = 0; + + if (usingMirror) { + // read available bytes from mirroredStream + InputStream is = getInputStream(); + length = is.read(b, off, len); } - openStream(); - int c = in.read(b, off, len); - if (c == -1) { - endOfStream = true; - close(); + + if (!closed && length < desiredBytes) { + stopUsingMirror(); + // read the rest of the bytes from originalStream + InputStream is = getInputStream(); + length += is.read(b, off + length, len - length); } - return c; + + // free underlying resources + if (!usingMirror && 0 >= length) { + closeOriginalStream(); - } + } + // copy byte to the mirrorBuffer + if (length > 0 && mirroring) { + int freeBytes = mirrorBuffer.length - mirrorPos; + if (length <= freeBytes) { + System.arraycopy(b, 0, mirrorBuffer, mirrorPos, length); + mirrorPos += length; + } else { + // buffer exceeded + stopMirroring(); + } + } + return length; + } + /** * {@inheritDoc} * When the stream is consumed, the database objects held by the instance are closed. */ public void close() throws IOException { - if (in != null) { - in.close(); - in = null; + if (!closed) { + closed = true; + closeOriginalStream(); // some additional database objects // may need to be closed if (rs != null) { DbUtility.close(rs); rs = null; } + mirroredStream = null; + stopMirroring(); } } + private void closeOriginalStream() throws IOException { + if (null != originalStream) { + originalStream.close(); + originalStream = null; + } + } + /** * {@inheritDoc} */ public long skip(long n) throws IOException { - if (endOfStream) { - return -1; + long skipped; + if (mirroring) { + // multiple read to cover for larger range of long + log.debug("Skipping by reading bytes from original stream."); + skipped = read(new byte[(int) n]); + } else { + skipped = getInputStream().skip(n); - } + } - openStream(); - return in.skip(n); + return skipped; } /** * {@inheritDoc} */ public int available() throws IOException { - if (endOfStream) { - return 0; + return getInputStream().available(); - } + } - openStream(); - return in.available(); - } /** * {@inheritDoc} */ - public void mark(int readlimit) { - if (endOfStream) { - return; - } + public void mark(int readLimit) { try { - openStream(); + if (usingMirror) { + log.debug("Marking mirrored input stream."); + getInputStream().mark(readLimit); + } else { + log.debug("Start mirroring due to mark()."); + startMirroring(readLimit); + } } catch (IOException e) { log.info("Error getting underlying stream: ", e); } - in.mark(readlimit); } /** * {@inheritDoc} */ public void reset() throws IOException { - if (endOfStream) { - throw new EOFException(); + if (usingMirror) { + log.debug("Resetting mirrored input stream."); + getInputStream().reset(); + } else if (mirroring) { + log.debug("Start using mirror due to reset()."); + startUsingMirror(); - } + } - openStream(); - in.reset(); } + /** * {@inheritDoc} */ public boolean markSupported() { - if (endOfStream) { - return false; + return true; - } + } - try { - openStream(); - } catch (IOException e) { - log.info("Error getting underlying stream: ", e); - return false; - } - return in.markSupported(); - } /** * Set the result set of this input stream. This object must be closed once * the stream is closed. - * + * * @param rs the result set */ void setResultSet(ResultSet rs) { this.rs = rs; } + + + private void ensureMirroredStream() { + if (null == mirroredStream) { + throw new IllegalStateException("Mirrored stream is null."); -} + } + } + + private void ensureOriginalStream() throws IOException { + if (null == originalStream) { + try { + log.debug("Opening underlying stream."); + originalStream = store.openStream(this, identifier); + } catch (DataStoreException e) { + IOException wrapped = new IOException(e.getMessage()); + wrapped.initCause(e); + throw wrapped; + } + } + } + + private void startUsingMirror() { + if (usingMirror) { + throw new IllegalStateException("Already using mirror. Cannot start using it again."); + } + log.debug("Start using mirror."); + mirroredStream = new ByteArrayInputStream(mirrorBuffer, 0, mirrorPos); + usingMirror = true; + stopMirroring(); + } + + + private void stopUsingMirror() { + if (!usingMirror) { + throw new IllegalStateException("Cannot stop using mirror when it is not used."); + } + log.debug("Stop using mirror."); + usingMirror = false; + mirroredStream = null; + } + + private void startMirroring(int bufferLength) { + if (usingMirror) { + throw new IllegalStateException("Cannot start mirroring while using mirror."); + } + log.debug("Start mirroring input stream."); + mirroring = true; + mirrorBuffer = new byte[bufferLength]; + mirrorPos = 0; + } + + private void stopMirroring() { + log.debug("Stop mirroring input stream."); + mirroring = false; + mirrorBuffer = null; + mirrorPos = -1; + } +} Index: jackrabbit-core/src/test/java/org/apache/jackrabbit/core/data/DBDataStoreTest.java =================================================================== --- jackrabbit-core/src/test/java/org/apache/jackrabbit/core/data/DBDataStoreTest.java (revision 783386) +++ jackrabbit-core/src/test/java/org/apache/jackrabbit/core/data/DBDataStoreTest.java Thu Mar 18 09:02:22 CET 2010 @@ -19,17 +19,24 @@ import java.io.ByteArrayInputStream; import java.io.File; import java.io.InputStream; +import java.io.PrintWriter; import java.util.Random; import junit.framework.TestCase; import org.apache.commons.io.FileUtils; import org.apache.jackrabbit.core.data.db.DbDataStore; +import org.apache.jackrabbit.core.data.db.DbInputStream; +import org.apache.jackrabbit.core.util.db.ConnectionFactory; +import org.apache.jackrabbit.test.JUnitTest; +import org.apache.jackrabbit.test.LogPrintWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Test the Database Data Store. */ -public class DBDataStoreTest extends TestCase { +public class DBDataStoreTest extends JUnitTest { private DbDataStore store = new DbDataStore(); @@ -41,6 +48,7 @@ FileUtils.deleteQuietly(new File("target/test-db-datastore")); // Initialize the data store + store.setConnectionFactory(new ConnectionFactory()); store.setUrl("jdbc:derby:target/test-db-datastore/db;create=true"); store.setDriver("org.apache.derby.jdbc.EmbeddedDriver"); store.init("target/test-db-datastore"); @@ -82,6 +90,86 @@ } } + public void testDbInputStreamReset() throws Exception { + DataRecord record = store.getRecord(identifier); + InputStream in = record.getStream(); + try { + // test whether mark and reset works + assertTrue(in instanceof DbInputStream); + assertTrue(in.markSupported()); + in.mark(data.length); + while (-1 != in.read()); + assertTrue(in.markSupported()); + try { + in.reset(); + } catch (Exception e) { + fail("Unexpected exception while resetting input stream: " + e.getMessage()); + } + + // test integrity of replayed bytes + byte[] replayedBytes = new byte[data.length]; + int length = in.read(replayedBytes); + assertEquals(length, data.length); + + for (int i = 0; i < data.length; i++) { + log.append(i + " data: " + data[i] + " replayed: " + replayedBytes[i] + "\n"); + assertEquals(data[i], replayedBytes[i]); + } + + assertEquals(-1, in.read()); + + + } finally { + in.close(); + log.flush(); + } + } + + /* + public void testDbInputStreamMarkTwice() throws Exception { + DataRecord record = store.getRecord(identifier); + InputStream in = record.getStream(); + try { + // test whether mark and reset works + assertTrue(in instanceof DbInputStream); + assertTrue(in.markSupported()); + in.mark(data.length); + + // read first 100 bytes + for (int i = 0; i < 100; i++) { + in.read(); + } + + in.mark(data.length - 100); + + // read next 150 bytes + for (int i = 0; i < 150; i++) { + in.read(); + } + + try { + log.append("attempting a reset()\n"); + in.reset(); + } catch (Exception e) { + fail("Unexpected exception while resetting input stream: " + e.getMessage()); + } + + // test integrity of replayed bytes + byte[] replayedBytes = new byte[data.length]; + int length = in.read(replayedBytes); + assertEquals(length, data.length - 100 - 150); + + for (int i = 0; i < length; i++) { + assertEquals(data[i + 100 + 150] & 0xff, replayedBytes[i] & 0xff); + } + + assertTrue(-1 == in.read()); + } finally { + in.close(); + } + } + */ + public void testConcurrentRead() throws Exception { InputStream[] streams = new InputStream[10]; @@ -103,5 +191,4 @@ streams[i].close(); } } - }