Index: openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/sql/Result.java =================================================================== --- openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/sql/Result.java (revision 655102) +++ openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/sql/Result.java (working copy) @@ -212,7 +212,9 @@ */ public InputStream getBinaryStream(Object obj) throws SQLException; - + + public InputStream getLOBStream(JDBCStore store, Object obj) + throws SQLException; /** * Return the value stored in the given column or id; may not be supported * by results that are not backed by a SQL result set. Index: openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/sql/MergedResult.java =================================================================== --- openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/sql/MergedResult.java (revision 655102) +++ openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/sql/MergedResult.java (working copy) @@ -252,6 +252,11 @@ return _res[_idx].getBinaryStream(obj); } + public InputStream getLOBStream(JDBCStore store, Object obj) + throws SQLException { + return _res[_idx].getLOBStream(store, obj); + } + public Blob getBlob(Object obj) throws SQLException { return _res[_idx].getBlob(obj); Index: openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/sql/AbstractResult.java =================================================================== --- openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/sql/AbstractResult.java (revision 655102) +++ openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/sql/AbstractResult.java (working copy) @@ -49,6 +49,7 @@ import org.apache.openjpa.lib.util.Closeable; import org.apache.openjpa.meta.JavaTypes; import org.apache.openjpa.util.UnsupportedException; + import serp.util.Strings; /** @@ -341,12 +342,23 @@ return getBinaryStreamInternal(translate(col, joins), joins); } + public InputStream getLOBStream(JDBCStore store, Object obj) + throws SQLException { + return getLOBStreamInternal(store, translate(obj, null), null); + } + protected InputStream getBinaryStreamInternal(Object obj, Joins joins) throws SQLException { return (InputStream) checkNull(getObjectInternal(obj, JavaSQLTypes.BINARY_STREAM, null, joins)); } + protected InputStream getLOBStreamInternal(JDBCStore store, Object obj, + Joins joins) throws SQLException { + return (InputStream) checkNull(getStreamInternal(store, obj, + JavaSQLTypes.BINARY_STREAM, null, joins)); + } + public Blob getBlob(Object obj) throws SQLException { return getBlobInternal(translate(obj, null), null); @@ -670,6 +682,9 @@ Object arg, Joins joins) throws SQLException; + protected abstract Object getStreamInternal(JDBCStore store, Object obj, + int metaType, Object arg, Joins joins) throws SQLException; + public Object getSQLObject(Object obj, Map map) throws SQLException { return getSQLObjectInternal(translate(obj, null), map, null); Index: openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/sql/DBDictionary.java =================================================================== --- openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/sql/DBDictionary.java (revision 655102) +++ openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/sql/DBDictionary.java (working copy) @@ -60,6 +60,7 @@ import java.util.Locale; import java.util.Map; import java.util.Set; + import javax.sql.DataSource; import org.apache.commons.lang.StringUtils; @@ -101,10 +102,12 @@ import org.apache.openjpa.util.InternalException; import org.apache.openjpa.util.InvalidStateException; import org.apache.openjpa.util.OpenJPAException; +import org.apache.openjpa.util.ReferentialIntegrityException; import org.apache.openjpa.util.Serialization; import org.apache.openjpa.util.StoreException; import org.apache.openjpa.util.UnsupportedException; import org.apache.openjpa.util.UserException; + import serp.util.Numbers; import serp.util.Strings; @@ -483,6 +486,11 @@ return rs.getBinaryStream(column); } + public InputStream getLOBStream(JDBCStore store, ResultSet rs, + int column) throws SQLException { + return rs.getBinaryStream(column); + } + /** * Convert the specified column of the SQL ResultSet to the proper * java type. @@ -4166,11 +4174,11 @@ return column.toString(); } - public void insertBlobForStreamingLoad(Row row, Column col, Object ob) - throws SQLException { + public void insertBlobForStreamingLoad(Row row, Column col, + JDBCStore store, Object ob, Select sel) throws SQLException { if (ob != null) { row.setBinaryStream(col, - new ByteArrayInputStream(new byte[0]), 0); + new ByteArrayInputStream(new byte[0]), 0); } else { row.setNull(col); } @@ -4415,4 +4423,8 @@ } return false; } + + public void deleteStream(JDBCStore store, Select sel) throws SQLException { + //Do nothing + } } Index: openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/sql/ResultSetResult.java =================================================================== --- openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/sql/ResultSetResult.java (revision 655102) +++ openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/sql/ResultSetResult.java (working copy) @@ -42,6 +42,7 @@ import org.apache.openjpa.jdbc.meta.JavaSQLTypes; import org.apache.openjpa.jdbc.schema.Column; import org.apache.openjpa.meta.JavaTypes; + import serp.util.Numbers; /** @@ -345,6 +346,11 @@ return _dict.getLong(_rs, ((Number) obj).intValue()); } + protected Object getStreamInternal(JDBCStore store, Object obj, + int metaTypeCode, Object arg, Joins joins) throws SQLException { + return getLOBStreamInternal(store, obj, joins); + } + protected Object getObjectInternal(Object obj, int metaTypeCode, Object arg, Joins joins) throws SQLException { @@ -498,4 +504,9 @@ return 0; } } + + protected InputStream getLOBStreamInternal(JDBCStore store, Object obj, + Joins joins) throws SQLException { + return _dict.getLOBStream(store, _rs, ((Number) obj).intValue()); + } } Index: openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/sql/PostgresDictionary.java =================================================================== --- openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/sql/PostgresDictionary.java (revision 655102) +++ openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/sql/PostgresDictionary.java (working copy) @@ -18,6 +18,9 @@ */ package org.apache.openjpa.jdbc.sql; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -29,6 +32,7 @@ import java.util.Arrays; import java.util.Date; +import org.apache.openjpa.jdbc.kernel.JDBCStore; import org.apache.openjpa.jdbc.kernel.exps.FilterValue; import org.apache.openjpa.jdbc.schema.Column; import org.apache.openjpa.jdbc.schema.Sequence; @@ -36,6 +40,11 @@ import org.apache.openjpa.lib.jdbc.DelegatingConnection; import org.apache.openjpa.lib.jdbc.DelegatingPreparedStatement; import org.apache.openjpa.lib.util.Localizer; +import org.apache.openjpa.util.InternalException; +import org.apache.openjpa.util.StoreException; +import org.postgresql.PGConnection; +import org.postgresql.largeobject.LargeObject; +import org.postgresql.largeobject.LargeObjectManager; /** * Dictionary for Postgres. @@ -319,6 +328,151 @@ return new PostgresConnection(super.decorate(conn), this); } + public InputStream getLOBStream(JDBCStore store, ResultSet rs, + int column) throws SQLException { + DelegatingConnection conn = (DelegatingConnection)store + .getConnection(); + conn.setAutoCommit(false); + LargeObjectManager lom = ((PGConnection)conn.getInnermostDelegate()) + .getLargeObjectAPI(); + if (rs.getInt(column) != -1) { + LargeObject lo = lom.open(rs.getInt(column)); + return lo.getInputStream(); + } else { + return null; + } + } + + public void insertBlobForStreamingLoad(Row row, Column col, + JDBCStore store, Object ob, Select sel) throws SQLException { + if (row.getAction() == Row.ACTION_INSERT) { + insertPostgresBlob(row, col, store, ob); + } else if (row.getAction() == Row.ACTION_UPDATE) { + updatePostgresBlob(row, col, store, ob, sel); + } + } + + private void insertPostgresBlob(Row row, Column col, JDBCStore store, + Object ob) throws SQLException { + if (ob != null) { + col.setType(Types.INTEGER); + DelegatingConnection conn = (DelegatingConnection)store + .getConnection(); + try { + conn.setAutoCommit(false); + PGConnection pgconn = (PGConnection) conn.getInnermostDelegate(); + LargeObjectManager lom = pgconn.getLargeObjectAPI(); + // The create method is valid in versions previous 8.3 + // in 8.3 this methos is deprecated, use createLO + int oid = lom.create(); + LargeObject lo = lom.open(oid, LargeObjectManager.WRITE); + OutputStream os = lo.getOutputStream(); + copy((InputStream)ob, os); + lo.close(); + row.setInt(col, oid); + } catch (IOException ioe) { + throw new StoreException(ioe); + } finally { + conn.close(); + } + } else { + row.setInt(col, -1); + } + } + + private void updatePostgresBlob(Row row, Column col, JDBCStore store, + Object ob, Select sel) throws SQLException { + SQLBuffer sql = sel.toSelect(true, store.getFetchConfiguration()); + ResultSet res = null; + DelegatingConnection conn = + (DelegatingConnection) store.getConnection(); + PreparedStatement stmnt = null; + try { + stmnt = sql.prepareStatement(conn, store.getFetchConfiguration(), + ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_UPDATABLE); + res = stmnt.executeQuery(); + if (!res.next()) { + throw new InternalException(_loc.get("stream-exception")); + } + int oid = res.getInt(1); + if (oid != -1) { + conn.setAutoCommit(false); + PGConnection pgconn = (PGConnection)conn + .getInnermostDelegate(); + LargeObjectManager lom = pgconn.getLargeObjectAPI(); + if (ob != null) { + LargeObject lo = lom.open(oid, LargeObjectManager.WRITE); + OutputStream os = lo.getOutputStream(); + copy((InputStream)ob, os); + lo.close(); + } else { + lom.delete(oid); + row.setInt(col, -1); + } + } else { + if (ob != null) { + conn.setAutoCommit(false); + PGConnection pgconn = (PGConnection)conn + .getInnermostDelegate(); + LargeObjectManager lom = pgconn.getLargeObjectAPI(); + oid = lom.create(); + LargeObject lo = lom.open(oid, LargeObjectManager.WRITE); + OutputStream os = lo.getOutputStream(); + copy((InputStream)ob, os); + lo.close(); + row.setInt(col, oid); + } + } + + } catch (IOException ioe) { + throw new StoreException(ioe); + } finally { + if (res != null) + try { res.close (); } catch (SQLException e) {} + if (stmnt != null) + try { stmnt.close (); } catch (SQLException e) {} + if (conn != null) + try { conn.close (); } catch (SQLException e) {} + } + + } + + public void updateBlob(Select sel, JDBCStore store, InputStream is) + throws SQLException { + //Do nothing + } + + public void deleteStream(JDBCStore store, Select sel) throws SQLException { + SQLBuffer sql = sel.toSelect(true, store.getFetchConfiguration()); + ResultSet res = null; + DelegatingConnection conn = + (DelegatingConnection) store.getConnection(); + PreparedStatement stmnt = null; + try { + stmnt = sql.prepareStatement(conn, store.getFetchConfiguration(), + ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_UPDATABLE); + res = stmnt.executeQuery(); + if (!res.next()) { + throw new InternalException(_loc.get("stream-exception")); + } + int oid = res.getInt(1); + if (oid != -1) { + conn.setAutoCommit(false); + PGConnection pgconn = (PGConnection)conn + .getInnermostDelegate(); + LargeObjectManager lom = pgconn.getLargeObjectAPI(); + lom.delete(oid); + } + } finally { + if (res != null) + try { res.close (); } catch (SQLException e) {} + if (stmnt != null) + try { stmnt.close (); } catch (SQLException e) {} + if (conn != null) + try { conn.close (); } catch (SQLException e) {} + } + } + /** * Connection wrapper to work around the postgres empty result set bug. */ Index: openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/MappedQueryResultObjectProvider.java =================================================================== --- openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/MappedQueryResultObjectProvider.java (revision 655102) +++ openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/MappedQueryResultObjectProvider.java (working copy) @@ -419,6 +419,13 @@ return _res.getSQLObject(obj, map); } + protected Object getStreamInternal(JDBCStore store, Object obj, + int metaTypeCode, Object arg, Joins joins) throws SQLException { + if (obj instanceof Column) + return _res.getObject((Column) obj, arg, joins); + return _res.getObject(obj, metaTypeCode, arg); + } + protected Ref getRefInternal(Object obj, Map map, Joins joins) throws SQLException { if (obj instanceof Column) Index: openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/meta/strats/LobFieldStrategy.java =================================================================== --- openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/meta/strats/LobFieldStrategy.java (revision 655102) +++ openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/meta/strats/LobFieldStrategy.java (working copy) @@ -28,11 +28,13 @@ import org.apache.openjpa.jdbc.meta.FieldMapping; import org.apache.openjpa.jdbc.meta.ValueMappingInfo; import org.apache.openjpa.jdbc.schema.Column; +import org.apache.openjpa.jdbc.sql.PostgresDictionary; import org.apache.openjpa.jdbc.sql.Result; import org.apache.openjpa.jdbc.sql.Row; import org.apache.openjpa.jdbc.sql.RowManager; import org.apache.openjpa.jdbc.sql.Select; import org.apache.openjpa.kernel.OpenJPAStateManager; +import org.apache.openjpa.meta.JavaTypes; /** * Direct mapping from a stream value to a column. @@ -57,8 +59,9 @@ vinfo.assertNoForeignKey(field, !adapt); Column tmpCol = new Column(); tmpCol.setName(field.getName()); + tmpCol.setType(fieldType); tmpCol.setJavaType(field.getTypeCode()); - tmpCol.setType(fieldType); + tmpCol.setSize(-1); Column[] cols = vinfo.getColumns(field, field.getName(), @@ -74,15 +77,22 @@ return null; } + public void delete(OpenJPAStateManager sm, JDBCStore store, RowManager rm) + throws SQLException { + Select sel = createSelect(sm, store); + store.getDBDictionary().deleteStream(store, sel); + } + public void insert(OpenJPAStateManager sm, JDBCStore store, RowManager rm) throws SQLException { Object ob = toDataStoreValue(sm.fetchObjectField (field.getIndex()), store); Row row = field.getRow(sm, store, rm, Row.ACTION_INSERT); if (field.getColumnIO().isInsertable(0, ob == null)) { + Select sel = createSelect(sm, store); if (isBlob()) { store.getDBDictionary().insertBlobForStreamingLoad - (row, field.getColumns()[0], ob); + (row, field.getColumns()[0], store, ob, sel); } else { store.getDBDictionary().insertClobForStreamingLoad (row, field.getColumns()[0], ob); @@ -115,15 +125,16 @@ public void update(OpenJPAStateManager sm, JDBCStore store, RowManager rm) throws SQLException { Object ob = toDataStoreValue(sm.fetchObjectField - (field.getIndex()), store); + (field.getIndex()), store); if (field.getColumnIO().isUpdatable(0, ob == null)) { Row row = field.getRow(sm, store, rm, Row.ACTION_UPDATE); + Select sel = createSelect(sm, store); if (isBlob()) { store.getDBDictionary().insertBlobForStreamingLoad - (row, field.getColumns()[0], ob); + (row, field.getColumns()[0], store, ob, sel); } else { store.getDBDictionary().insertClobForStreamingLoad - (row, field.getColumns()[0], ob); + (row, field.getColumns()[0], sel); } } } @@ -164,7 +175,8 @@ Column col = field.getColumns()[0]; if (res.contains(col)) { if (isBlob()) { - sm.storeObject(field.getIndex(), res.getBinaryStream(col)); + sm.storeObject(field.getIndex(), + res.getLOBStream(store, col)); } else { sm.storeObject(field.getIndex(), res.getCharacterStream(col)); } @@ -177,16 +189,22 @@ } public void setFieldMapping(FieldMapping owner) { - if (owner.getType().isAssignableFrom(InputStream.class)) { - fieldType = Types.BLOB; - } else if (owner.getType().isAssignableFrom(Reader.class)) { - fieldType = Types.CLOB; + field = owner; + if (owner.getElementMapping().getMappingRepository().getDBDictionary() + instanceof PostgresDictionary) { + fieldType = Types.INTEGER; + field.setTypeCode(JavaTypes.INT); + } else { + if (owner.getType().isAssignableFrom(InputStream.class)) { + fieldType = Types.BLOB; + } else if (owner.getType().isAssignableFrom(Reader.class)) { + fieldType = Types.CLOB; + } } - field = owner; } private boolean isBlob() { - if (fieldType == Types.BLOB) + if (fieldType == Types.BLOB || fieldType == Types.INTEGER) return true; return false; } Index: openjpa-jdbc/pom.xml =================================================================== --- openjpa-jdbc/pom.xml (revision 655102) +++ openjpa-jdbc/pom.xml (working copy) @@ -48,6 +48,11 @@ hsqldb compile + + postgresql + postgresql + 8.1-407.jdbc3 + Index: openjpa-persistence-jdbc/src/test/java/org/apache/openjpa/jdbc/meta/strats/AbstractLobTest.java =================================================================== --- openjpa-persistence-jdbc/src/test/java/org/apache/openjpa/jdbc/meta/strats/AbstractLobTest.java (revision 655102) +++ openjpa-persistence-jdbc/src/test/java/org/apache/openjpa/jdbc/meta/strats/AbstractLobTest.java (working copy) @@ -30,6 +30,7 @@ import org.apache.openjpa.jdbc.sql.DBDictionary; import org.apache.openjpa.jdbc.sql.MySQLDictionary; import org.apache.openjpa.jdbc.sql.OracleDictionary; +import org.apache.openjpa.jdbc.sql.PostgresDictionary; import org.apache.openjpa.jdbc.sql.SQLServerDictionary; import org.apache.openjpa.meta.ClassMetaData; import org.apache.openjpa.persistence.JPAFacadeHelper; @@ -56,7 +57,8 @@ .getDBDictionaryInstance(); if (dict instanceof MySQLDictionary || dict instanceof SQLServerDictionary || - dict instanceof OracleDictionary) { + dict instanceof OracleDictionary || + dict instanceof PostgresDictionary) { return true; } return false; @@ -81,6 +83,7 @@ insert(newLobEntity(s, 1)); EntityManager em = emf.createEntityManager(); em.getTransaction().begin(); + Query query = em.createQuery(getSelectQuery()); LobEntity entity = (LobEntity) query.getSingleResult(); assertNotNull(entity.getStream()); @@ -169,7 +172,7 @@ em.getTransaction().commit(); em.close(); } - + public void testLifeCycleInsertFlushModify() { if (!isDatabaseSupported()) return; EntityManager em = emf.createEntityManager(); Index: openjpa-project/src/doc/manual/ref_guide_dbsetup.xml =================================================================== --- openjpa-project/src/doc/manual/ref_guide_dbsetup.xml (revision 655102) +++ openjpa-project/src/doc/manual/ref_guide_dbsetup.xml (working copy) @@ -2400,6 +2400,38 @@ sequence name. Defaults to a database-appropriate value. + + + + + BLOB + + + BlobBufferSize + + +BlobBufferSize: This property establish the buffer size in +the INSERT/UPDATE operations with an +java.io.InputStreamThis is only used in the +Stream LOB Support. Defaults to 50000. + + + + + + + CLOB + + + ClobBufferSize + + +ClobBufferSize: This property establish the buffer size in +the INSERT/UPDATE operations with a +java.io.ReaderThis is only used in the +Stream LOB Support. Defaults to 50000. + +
Index: openjpa-project/src/doc/manual/ref_guide_mapping.xml =================================================================== --- openjpa-project/src/doc/manual/ref_guide_mapping.xml (revision 655102) +++ openjpa-project/src/doc/manual/ref_guide_mapping.xml (working copy) @@ -2680,6 +2680,47 @@
+
+ + Stream LOB Support + + + + stream support + + + + + stream lob support + + + +Since the 1.1.0 release Apache OpenJPA added support for Streams. This feature +makes it possible to stream large amounts of data into and out of fields in +objects managed by OpenJPA without ever holding all the data in memory at the +same time. + + +To persist a stream, use the + +org.apache.openjpa.persistence.Persistent +annotation. + + + + Showing annotated InputStream + + +@Entity +public class Employee { + ... + @Persistent + private InputStream photoStream; + ... +} + + +
Key Columns