Index: openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/sql/DBDictionary.java =================================================================== --- openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/sql/DBDictionary.java (revision 567726) +++ openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/sql/DBDictionary.java (working copy) @@ -20,9 +20,11 @@ import java.io.BufferedReader; import java.io.ByteArrayInputStream; +import java.io.CharArrayReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.io.OutputStream; import java.io.Reader; import java.io.StringReader; import java.io.Writer; @@ -95,6 +97,7 @@ import org.apache.openjpa.meta.FieldMetaData; import org.apache.openjpa.meta.JavaTypes; import org.apache.openjpa.util.GeneralException; +import org.apache.openjpa.util.InternalException; import org.apache.openjpa.util.OpenJPAException; import org.apache.openjpa.util.ReferentialIntegrityException; import org.apache.openjpa.util.Serialization; @@ -123,6 +126,9 @@ public static final String CONS_NAME_BEFORE = "before"; public static final String CONS_NAME_MID = "mid"; public static final String CONS_NAME_AFTER = "after"; + + public static final int BLOB_BUFFER_SIZE = 50; + public static final int ClOB_BUFFER_SIZE = 50; protected static final int RANGE_POST_SELECT = 0; protected static final int RANGE_PRE_DISTINCT = 1; @@ -3869,4 +3875,103 @@ public String getVersionColumn(Column column, String tableAlias) { return column.toString(); } + + public void insertBlobForStreamingLoad(Row row, Column col) + throws SQLException { + row.setBinaryStream(col, + new ByteArrayInputStream(new byte[0]), 0); + } + + public void insertClobForStreamingLoad(Row row, Column col) + throws SQLException { + row.setCharacterStream(col, + new CharArrayReader(new char[0]), 0); + } + + public void updateBlob(Select sel, JDBCStore store, InputStream is) + throws SQLException { + SQLBuffer sql = sel.toSelect(true, store.getFetchConfiguration()); + ResultSet res = null; + Connection conn = store.getConnection(); + PreparedStatement stmnt = null; + try { + stmnt = sql.prepareStatement(conn, store.getFetchConfiguration(), + ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_UPDATABLE); + res = stmnt.executeQuery(); + if (!res.next()) { + // ##### add a meaningful message; maybe a + // ##### different exception type + throw new InternalException(); + } + Blob blob = res.getBlob(1); + OutputStream os = blob.setBinaryStream(1); + copy(is, os); + os.close(); + res.updateBlob(1, blob); + res.updateRow(); + + } catch (IOException e) { + throw new SQLException(); + } finally { + if (res != null) try { res.close (); } catch (SQLException e) {} + if (stmnt != null) try { stmnt.close (); } catch (SQLException e) {} + if (conn != null) try { conn.close (); } catch (SQLException e) {} + } + } + + public void updateClob(Select sel, JDBCStore store, Reader reader) + throws SQLException { + SQLBuffer sql = sel.toSelect(true, store.getFetchConfiguration()); + ResultSet res = null; + Connection conn = store.getConnection(); + PreparedStatement stmnt = null; + try { + stmnt = sql.prepareStatement(conn, store.getFetchConfiguration(), + ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_UPDATABLE); + res = stmnt.executeQuery(); + if (!res.next()) { + // ##### add a meaningful message; maybe a + // ##### different exception type + throw new InternalException(); + } + Clob clob = res.getClob(1); + Writer writer = clob.setCharacterStream(1); + copy(reader, writer); + writer.close(); + res.updateClob(1, clob); + res.updateRow(); + + } catch (IOException e) { + throw new SQLException(); + } finally { + if (res != null) try { res.close (); } catch (SQLException e) {} + if (stmnt != null) try { stmnt.close (); } catch (SQLException e) {} + if (conn != null) try { conn.close (); } catch (SQLException e) {} + } + } + + protected long copy(InputStream in, OutputStream out) throws IOException { + byte[] copyBuffer = new byte[BLOB_BUFFER_SIZE]; + long bytesCopied = 0; + int read = -1; + + while ((read = in.read(copyBuffer, 0, copyBuffer.length)) != -1) { + out.write(copyBuffer, 0, read); + bytesCopied += read; + } + return bytesCopied; + } + + protected long copy(Reader reader, Writer writer) throws IOException { + char[] copyBuffer = new char[ClOB_BUFFER_SIZE]; + long bytesCopied = 0; + int read = -1; + + while ((read = reader.read(copyBuffer, 0, copyBuffer.length)) != -1) { + writer.write(copyBuffer, 0, read); + bytesCopied += read; + } + + return bytesCopied; + } } Index: openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/sql/OracleDictionary.java =================================================================== --- openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/sql/OracleDictionary.java (revision 567726) +++ openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/sql/OracleDictionary.java (working copy) @@ -49,6 +49,7 @@ import org.apache.openjpa.lib.jdbc.DelegatingPreparedStatement; import org.apache.openjpa.lib.util.Localizer; import org.apache.openjpa.util.StoreException; + import serp.util.Numbers; /** @@ -1097,4 +1098,14 @@ val.appendTo(buf); buf.append("')"); } + + public void insertBlobForStreamingLoad(Row row, Column col) + throws SQLException { + row.setNull(col); + } + + public void insertClobForStreamingLoad(Row row, Column col) + throws SQLException { + row.setNull(col); + } } Index: openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/meta/strats/LobFieldStrategy.java =================================================================== --- openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/meta/strats/LobFieldStrategy.java (revision 0) +++ openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/meta/strats/LobFieldStrategy.java (revision 0) @@ -0,0 +1,189 @@ +/* +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.openjpa.jdbc.meta.strats; + +import java.io.InputStream; +import java.io.Reader; +import java.sql.SQLException; +import java.sql.Types; + +import org.apache.openjpa.jdbc.kernel.JDBCFetchConfiguration; +import org.apache.openjpa.jdbc.kernel.JDBCStore; +import org.apache.openjpa.jdbc.meta.FieldMapping; +import org.apache.openjpa.jdbc.meta.ValueMappingInfo; +import org.apache.openjpa.jdbc.schema.Column; +import org.apache.openjpa.jdbc.sql.Result; +import org.apache.openjpa.jdbc.sql.Row; +import org.apache.openjpa.jdbc.sql.RowManager; +import org.apache.openjpa.jdbc.sql.Select; +import org.apache.openjpa.kernel.OpenJPAStateManager; + +/** + * Direct mapping from a stream value to a column. + * + * @author Ignacio Andreu + * @since 1.0.0 + */ + +public class LobFieldStrategy extends AbstractFieldStrategy { + + private int fieldType; + + public void map(boolean adapt) { + assertNotMappedBy(); + field.mapJoin(adapt, false); + field.getKeyMapping().getValueInfo().assertNoSchemaComponents + (field.getKey(), !adapt); + field.getElementMapping().getValueInfo().assertNoSchemaComponents + (field.getElement(), !adapt); + + ValueMappingInfo vinfo = field.getValueInfo(); + vinfo.assertNoJoin(field, true); + vinfo.assertNoForeignKey(field, !adapt); + Column tmpCol = new Column(); + tmpCol.setName(field.getName()); + tmpCol.setType(fieldType); + tmpCol.setSize(-1); + + Column[] cols = vinfo.getColumns(field, field.getName(), + new Column[]{ tmpCol }, field.getTable(), adapt); + + field.setColumns(cols); + field.setColumnIO(vinfo.getColumnIO()); + field.mapConstraints(field.getName(), adapt); + field.mapPrimaryKey(adapt); + } + + public Boolean isCustomInsert(OpenJPAStateManager sm, JDBCStore store) { + return null; + } + + public void insert(OpenJPAStateManager sm, JDBCStore store, RowManager rm) + throws SQLException { + Object ob = toDataStoreValue(sm.fetchObjectField + (field.getIndex()), store); + Row row = field.getRow(sm, store, rm, Row.ACTION_INSERT); + if (field.getColumnIO().isInsertable(0, ob == null)) { + if (ob != null) { + if (isBlob()) { + store.getDBDictionary().insertBlobForStreamingLoad(row, field.getColumns()[0]); + } else { + store.getDBDictionary().insertClobForStreamingLoad(row, field.getColumns()[0]); + } + } else { + Column col = field.getColumns()[0]; + col.setType(Types.OTHER); + row.setNull(col); + } + } + } + + public void customInsert(OpenJPAStateManager sm, JDBCStore store) + throws SQLException { + Object ob = toDataStoreValue(sm.fetchObjectField + (field.getIndex()), store); + if (field.getColumnIO().isInsertable(0, ob == null)) { + if (ob != null) { + Select sel = createSelect(sm, store); + if (isBlob()) { + store.getDBDictionary().updateBlob(sel, store, (InputStream)ob); + } else { + store.getDBDictionary().updateClob(sel, store, (Reader)ob); + } + } + } + } + + public void update(OpenJPAStateManager sm, JDBCStore store, RowManager rm) + throws SQLException { + Object ob = toDataStoreValue(sm.fetchObjectField + (field.getIndex()), store); + if (field.getColumnIO().isUpdatable(0, ob == null)) { + if (ob != null) { + Select sel = createSelect(sm, store); + if (isBlob()) { + store.getDBDictionary().updateBlob(sel, store, (InputStream)ob); + } else { + store.getDBDictionary().updateClob(sel, store, (Reader)ob); + } + } else { + Row row = field.getRow(sm, store, rm, Row.ACTION_UPDATE); + Column col = field.getColumns()[0]; + col.setType(Types.OTHER); + row.setNull(col); + } + } + } + + public int supportsSelect(Select sel, int type, OpenJPAStateManager sm, + JDBCStore store, JDBCFetchConfiguration fetch) { + if (type == Select.TYPE_JOINLESS && sel.isSelected(field.getTable())) + return 1; + return 0; + } + + public int select(Select sel, OpenJPAStateManager sm, JDBCStore store, + JDBCFetchConfiguration fetch, int eagerMode) { + sel.select(field.getColumns()[0], field.join(sel)); + return 1; + } + + public void load(OpenJPAStateManager sm, JDBCStore store, + JDBCFetchConfiguration fetch, Result res) throws SQLException { + Column col = field.getColumns()[0]; + if (res.contains(col)) { + if (isBlob()) { + sm.storeObject(field.getIndex(), res.getBinaryStream(col)); + } else { + sm.storeObject(field.getIndex(), res.getCharacterStream(col)); + } + } + } + + protected void assertNotMappedBy() { + if (field != null && field.getMappedBy() != null) + throw new UnsupportedOperationException(); + } + + public void setFieldMapping(FieldMapping owner) { + if (owner.getType() == InputStream.class) { + fieldType = Types.BLOB; + } else if (owner.getType() == Reader.class) { + fieldType = Types.CLOB; + } else { + // TODO throws an exception. + } + field = owner; + } + + private boolean isBlob() { + if (fieldType == Types.BLOB) + return true; + return false; + } + private Select createSelect(OpenJPAStateManager sm, JDBCStore store) { + Select sel = store.getSQLFactory().newSelect(); + sel.select(field.getColumns()[0]); + sel.selectPrimaryKey(field.getDefiningMapping()); + sel.wherePrimaryKey(sm.getObjectId(), field.getDefiningMapping(), store); + sel.setLob(true); + return sel; + } +} \ No newline at end of file Index: openjpa-lib/src/main/java/org/apache/openjpa/lib/jdbc/DelegatingResultSet.java =================================================================== --- openjpa-lib/src/main/java/org/apache/openjpa/lib/jdbc/DelegatingResultSet.java (revision 567726) +++ openjpa-lib/src/main/java/org/apache/openjpa/lib/jdbc/DelegatingResultSet.java (working copy) @@ -436,11 +436,19 @@ _rs.updateBinaryStream(a, in, b); } + public void updateBlob(int a, Blob blob) throws SQLException { + _rs.updateBlob(a, blob); + } + public void updateCharacterStream(int a, Reader reader, int b) throws SQLException { _rs.updateCharacterStream(a, reader, b); } + public void updateClob(int a, Clob clob) throws SQLException { + _rs.updateClob(a, clob); + } + public void updateObject(int a, Object ob, int b) throws SQLException { _rs.updateObject(a, ob, b); } @@ -643,18 +651,10 @@ throw new UnsupportedOperationException(); } - public void updateBlob(int column, Blob blob) throws SQLException { - throw new UnsupportedOperationException(); - } - public void updateBlob(String columnName, Blob blob) throws SQLException { throw new UnsupportedOperationException(); } - public void updateClob(int column, Clob clob) throws SQLException { - throw new UnsupportedOperationException(); - } - public void updateClob(String columnName, Clob clob) throws SQLException { throw new UnsupportedOperationException(); } Index: openjpa-persistence-jdbc/src/test/java/org/apache/openjpa/jdbc/meta/strats/ReaderEntity.java =================================================================== --- openjpa-persistence-jdbc/src/test/java/org/apache/openjpa/jdbc/meta/strats/ReaderEntity.java (revision 0) +++ openjpa-persistence-jdbc/src/test/java/org/apache/openjpa/jdbc/meta/strats/ReaderEntity.java (revision 0) @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.openjpa.jdbc.meta.strats; + +import java.io.Reader; + +import javax.persistence.Entity; +import javax.persistence.Id; + +import org.apache.openjpa.persistence.Persistent; + +/** + * An entity with a Reader. + * + * @author Ignacio Andreu + * @since 1.0.0 + */ + +@Entity +public class ReaderEntity { + + @Id + int id; + + @Persistent + Reader reader; + + public int getId() { + return id; + } + + public void setId(int id) { + this.id = id; + } + + public Reader getReader() { + return reader; + } + + public void setReader(Reader reader) { + this.reader = reader; + } + +} Index: openjpa-persistence-jdbc/src/test/java/org/apache/openjpa/jdbc/meta/strats/TestReader.java =================================================================== --- openjpa-persistence-jdbc/src/test/java/org/apache/openjpa/jdbc/meta/strats/TestReader.java (revision 0) +++ openjpa-persistence-jdbc/src/test/java/org/apache/openjpa/jdbc/meta/strats/TestReader.java (revision 0) @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.openjpa.jdbc.meta.strats; + +import java.io.ByteArrayInputStream; +import java.io.CharArrayReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.Reader; +import java.sql.SQLException; + +import javax.persistence.EntityManager; +import javax.persistence.Query; + +import org.apache.openjpa.persistence.test.SingleEMFTestCase; + +/** + * Test case of the Reader. + * + * @author Ignacio Andreu + * @since 1.0.0 + */ + +public class TestReader extends SingleEMFTestCase { + + protected void setUp() throws Exception { + setUp(ReaderEntity.class, CLEAR_TABLES, + "openjpa.jdbc.MappingDefaults", + "FieldStrategies=java.io.Reader= " + + "org.apache.openjpa.jdbc.meta.strats.LobFieldStrategy", + "openjpa.Log", + "SQL=TRACE"); + } + + public void insert(Reader reader) { + EntityManager em = emf.createEntityManager(); + em.getTransaction().begin(); + ReaderEntity re = new ReaderEntity(); + re.setId(1); + re.setReader(reader); + em.persist(re); + em.getTransaction().commit(); + em.close(); + } + + public void testInsertAndSelectReader() + throws IOException { + Reader reader = new CharArrayReader("oOOOOOo".toCharArray()); + insert(reader); + EntityManager em = emf.createEntityManager(); + em.getTransaction().begin(); + Query q = em.createQuery("select o from ReaderEntity o"); + ReaderEntity entity = (ReaderEntity)q.getSingleResult(); + assertNotNull(entity.getReader()); + assertEquals("oOOOOOo", getReaderContent(entity.getReader())); + em.getTransaction().commit(); + em.close(); + } + + public void testInsertNullInputReader() { + insert(null); + EntityManager em = emf.createEntityManager(); + em.getTransaction().begin(); + ReaderEntity re = em.find(ReaderEntity.class, 1); + assertNull(re.getReader()); + em.getTransaction().commit(); + em.close(); + } + + public void testUpdateReader() throws IOException { + Reader reader = new CharArrayReader("oOOOOOo".toCharArray()); + insert(reader); + EntityManager em = emf.createEntityManager(); + em.getTransaction().begin(); + ReaderEntity entity = em.find(ReaderEntity.class, 1); + String string = "iIIIIIi"; + reader = new CharArrayReader(string.toCharArray()); + entity.setReader(reader); + em.persist(entity); + em.getTransaction().commit(); + em.close(); + em = emf.createEntityManager(); + em.getTransaction().begin(); + entity = em.find(ReaderEntity.class, 1); + assertEquals(string, getReaderContent(entity.getReader())); + em.getTransaction().commit(); + em.close(); + } + + public void testLoadReaderContent() { + EntityManager em = emf.createEntityManager(); + em.getTransaction().begin(); + ReaderEntity re = new ReaderEntity(); + ReaderWrapper rw = new ReaderWrapper(new CharArrayReader("oOOOOOo".toCharArray())); + re.setId(1); + re.setReader(rw); + em.persist(re); + em.getTransaction().commit(); + em.close(); + } + + public void testDeleteReader() { + Reader reader = new CharArrayReader("oOOOOOo".toCharArray()); + insert(reader); + EntityManager em = emf.createEntityManager(); + em.getTransaction().begin(); + ReaderEntity entity = em.find(ReaderEntity.class, 1); + em.remove(entity); + em.getTransaction().commit(); + em.close(); + em = emf.createEntityManager(); + em.getTransaction().begin(); + Query q = em.createQuery("select o from ReaderEntity o"); + assertEquals(0, q.getResultList().size()); + em.getTransaction().commit(); + em.close(); + } + + public void testLifeCycleInsertFlushModify() { + EntityManager em = emf.createEntityManager(); + em.getTransaction().begin(); + ReaderEntity re = new ReaderEntity(); + re.setId(1); + re.setReader(new CharArrayReader("oOOOOOo".toCharArray())); + em.persist(re); + em.flush(); + re.setReader(new CharArrayReader("iIIIIIi".toCharArray())); + em.persist(re); + em.getTransaction().commit(); + em.close(); + } + + public void testLifeCycleLoadFlushModifyFlush() { + Reader reader = new CharArrayReader("oOOOOOo".toCharArray()); + insert(reader); + EntityManager em = emf.createEntityManager(); + em.getTransaction().begin(); + ReaderEntity re = em.find(ReaderEntity.class, 1); + em.flush(); + re.setReader(new CharArrayReader("iIIIIIi".toCharArray())); + em.flush(); + em.getTransaction().commit(); + em.close(); + } + + public void testReadingMultipleTimesWithASingleConnection() throws IOException { + Reader reader = new CharArrayReader("oOOOOOo".toCharArray()); + insert(reader); + EntityManager em = emf.createEntityManager(); + em.getTransaction().begin(); + ReaderEntity entity = em.find(ReaderEntity.class, 1); + String string = "iIIIIIi"; + reader = new CharArrayReader(string.toCharArray()); + entity.setReader(reader); + em.persist(entity); + em.getTransaction().commit(); + em.close(); + em = emf.createEntityManager(); + em.getTransaction().begin(); + entity = em.find(ReaderEntity.class, 1); + assertEquals(string, getReaderContent(entity.getReader())); + ReaderEntity re = new ReaderEntity(); + re.setId(2); + re.setReader(new CharArrayReader(string.toCharArray())); + em.persist(re); + //assertEquals(string, getReaderContent(entity.getReader())); + em.getTransaction().commit(); + em.close(); + //assertEquals(string, getReaderContent(entity.getReader())); + } + + public String getReaderContent(Reader reader) throws IOException { + String content= ""; + char[] cs = new char[4]; + int read = -1; + do { + read = reader.read(cs); + if (read == -1) { + //reader.reset(); + return content; + } + content = content + (new String(cs)).substring(0,read); + } while(true); + } +} Index: openjpa-persistence-jdbc/src/test/java/org/apache/openjpa/jdbc/meta/strats/ReaderWrapper.java =================================================================== --- openjpa-persistence-jdbc/src/test/java/org/apache/openjpa/jdbc/meta/strats/ReaderWrapper.java (revision 0) +++ openjpa-persistence-jdbc/src/test/java/org/apache/openjpa/jdbc/meta/strats/ReaderWrapper.java (revision 0) @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.openjpa.jdbc.meta.strats; + +import java.io.IOException; +import java.io.Reader; + +/** + * This class is used to kwon where the content of the Reader is load. + * If the content is load out of the flush then throws a UnsupportedOperationException + * + * @author Ignacio Andreu + * @since 1.0.0 + */ + +public class ReaderWrapper extends Reader { + private Reader reader; + + public ReaderWrapper(Reader reader) { + this.reader = reader; + } + + public void close() throws IOException { + reader.close(); + + } + + public int read(char[] cbuf, int off, int len) throws IOException { + StackTraceElement[] ste = Thread.currentThread().getStackTrace(); + for (StackTraceElement element : ste) { + if ("flush".equals(element.getMethodName())) { + return reader.read(cbuf, off, len); + } + } + throw new UnsupportedOperationException(); + } + + +} Index: openjpa-persistence-jdbc/src/test/java/org/apache/openjpa/jdbc/meta/strats/InputStreamEntity.java =================================================================== --- openjpa-persistence-jdbc/src/test/java/org/apache/openjpa/jdbc/meta/strats/InputStreamEntity.java (revision 0) +++ openjpa-persistence-jdbc/src/test/java/org/apache/openjpa/jdbc/meta/strats/InputStreamEntity.java (revision 0) @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.openjpa.jdbc.meta.strats; + +import java.io.InputStream; + +import javax.persistence.Entity; +import javax.persistence.Id; + +import org.apache.openjpa.persistence.Persistent; + +/** + * An entity with an InputStream. + * + * @author Ignacio Andreu + * @since 1.0.0 + */ + +@Entity +public class InputStreamEntity { + + @Id + private int id; + + @Persistent + private InputStream stream; + + + public int getId() { + return id; + } + + public void setId(int id) { + this.id = id; + } + + public InputStream getStream() { + return stream; + } + + public void setStream(InputStream stream) { + this.stream = stream; + } + +} Index: openjpa-persistence-jdbc/src/test/java/org/apache/openjpa/jdbc/meta/strats/TestInputStream.java =================================================================== --- openjpa-persistence-jdbc/src/test/java/org/apache/openjpa/jdbc/meta/strats/TestInputStream.java (revision 0) +++ openjpa-persistence-jdbc/src/test/java/org/apache/openjpa/jdbc/meta/strats/TestInputStream.java (revision 0) @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.openjpa.jdbc.meta.strats; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.sql.SQLException; + +import javax.persistence.EntityManager; +import javax.persistence.Query; + +import org.apache.openjpa.persistence.test.SingleEMFTestCase; + +/** + * Test case of the InputStream. + * + * @author Ignacio Andreu + * @since 1.0.0 + */ + +/* TO-DO List: + a test that sets the value and then resets it without flushing + and a test thaht sets, flushes, and then resets + user-experience + Data cache integration + */ +public class TestInputStream extends SingleEMFTestCase { + + protected void setUp() throws Exception { + setUp(InputStreamEntity.class, CLEAR_TABLES, + "openjpa.jdbc.MappingDefaults", + "FieldStrategies=java.io.InputStream= " + + "org.apache.openjpa.jdbc.meta.strats.LobFieldStrategy", + "openjpa.Log", + "SQL=TRACE"); + } + + public void insert(InputStream stream) { + EntityManager em = emf.createEntityManager(); + em.getTransaction().begin(); + InputStreamEntity ise = new InputStreamEntity(); + ise.setId(1); + ise.setStream(stream); + em.persist(ise); + em.getTransaction().commit(); + em.close(); + } + + public void testInsertAndSelectInputStream() + throws IOException { + InputStream stream = new ByteArrayInputStream("oOOOOOo".getBytes()); + insert(stream); + EntityManager em = emf.createEntityManager(); + em.getTransaction().begin(); + em.flush(); + Query q = em.createQuery("select o from InputStreamEntity o"); + InputStreamEntity entity = (InputStreamEntity)q.getSingleResult(); + assertNotNull(entity.getStream()); + assertEquals("oOOOOOo", getInputStreamContent(entity.getStream())); + em.getTransaction().commit(); + em.close(); + } + + public void testInsertNullInputStream() { + insert(null); + EntityManager em = emf.createEntityManager(); + em.getTransaction().begin(); + InputStreamEntity ise = em.find(InputStreamEntity.class, 1); + assertNull(ise.getStream()); + em.getTransaction().commit(); + em.close(); + } + + public void testUpdateInputStream() throws IOException { + InputStream stream = new ByteArrayInputStream("oOOOOOo".getBytes()); + insert(stream); + EntityManager em = emf.createEntityManager(); + em.getTransaction().begin(); + InputStreamEntity entity = em.find(InputStreamEntity.class, 1); + String string = "iIIIIIi"; + stream = new ByteArrayInputStream(string.getBytes()); + entity.setStream(stream); + em.persist(entity); + em.getTransaction().commit(); + em.close(); + em = emf.createEntityManager(); + em.getTransaction().begin(); + entity = em.find(InputStreamEntity.class, 1); + assertEquals(string, getInputStreamContent(entity.getStream())); + em.getTransaction().commit(); + em.close(); + } + + public void testLoadInputStreamContent() { + EntityManager em = emf.createEntityManager(); + em.getTransaction().begin(); + InputStreamEntity ise = new InputStreamEntity(); + InputStreamWrapper isw = new InputStreamWrapper(new ByteArrayInputStream("oOOOOOo".getBytes())); + ise.setId(1); + ise.setStream(isw); + em.persist(ise); + em.getTransaction().commit(); + em.close(); + } + + public void testDeleteInputStream() { + InputStream stream = new ByteArrayInputStream("oOOOOOo".getBytes()); + insert(stream); + EntityManager em = emf.createEntityManager(); + em.getTransaction().begin(); + InputStreamEntity entity = em.find(InputStreamEntity.class, 1); + em.remove(entity); + em.getTransaction().commit(); + em.close(); + em = emf.createEntityManager(); + em.getTransaction().begin(); + Query q = em.createQuery("select o from InputStreamEntity o"); + assertEquals(0, q.getResultList().size()); + em.getTransaction().commit(); + em.close(); + } + + public void testLifeCycleInsertFlushModify() { + EntityManager em = emf.createEntityManager(); + em.getTransaction().begin(); + InputStreamEntity ise = new InputStreamEntity(); + ise.setId(1); + ise.setStream(new ByteArrayInputStream("oOOOOOo".getBytes())); + em.persist(ise); + em.flush(); + ise.setStream(new ByteArrayInputStream("iIIIIIi".getBytes())); + em.persist(ise); + em.getTransaction().commit(); + em.close(); + } + + public void testLifeCycleLoadFlushModifyFlush() { + InputStream stream = new ByteArrayInputStream("oOOOOOo".getBytes()); + insert(stream); + EntityManager em = emf.createEntityManager(); + em.getTransaction().begin(); + InputStreamEntity ise = em.find(InputStreamEntity.class, 1); + em.flush(); + ise.setStream(new ByteArrayInputStream("iIIIIIi".getBytes())); + em.flush(); + em.getTransaction().commit(); + em.close(); + } + + public void testReadingMultipleTimesWithASingleConnection() throws IOException { + InputStream stream = new ByteArrayInputStream("oOOOOOo".getBytes()); + insert(stream); + EntityManager em = emf.createEntityManager(); + em.getTransaction().begin(); + InputStreamEntity entity = em.find(InputStreamEntity.class, 1); + String string = "iIIIIIi"; + stream = new ByteArrayInputStream(string.getBytes()); + entity.setStream(stream); + em.persist(entity); + em.getTransaction().commit(); + em.close(); + em = emf.createEntityManager(); + em.getTransaction().begin(); + entity = em.find(InputStreamEntity.class, 1); + assertEquals(string, getInputStreamContent(entity.getStream())); + InputStreamEntity ise = new InputStreamEntity(); + ise.setId(2); + ise.setStream(new ByteArrayInputStream(string.getBytes())); + em.persist(ise); + //assertEquals(string, getInputStreamContent(entity.getStream())); + em.getTransaction().commit(); + em.close(); + //assertEquals(string, getInputStreamContent(entity.getStream())); + } + + public String getInputStreamContent(InputStream is) throws IOException { + String content = ""; + byte[] bs = new byte[4]; + int read = -1; + do { + read = is.read(bs); + if (read == -1) { + //is.reset(); + return content; + } + content = content + (new String(bs)).substring(0, read); + } while(true); + } + +} Index: openjpa-persistence-jdbc/src/test/java/org/apache/openjpa/jdbc/meta/strats/InputStreamWrapper.java =================================================================== --- openjpa-persistence-jdbc/src/test/java/org/apache/openjpa/jdbc/meta/strats/InputStreamWrapper.java (revision 0) +++ openjpa-persistence-jdbc/src/test/java/org/apache/openjpa/jdbc/meta/strats/InputStreamWrapper.java (revision 0) @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.openjpa.jdbc.meta.strats; + +import java.io.IOException; +import java.io.InputStream; + +/** + * This class is used to kwon where the content of the InputStream is load. + * If the content is load out of the flush then throws a UnsupportedOperationException + * + * @author Ignacio Andreu + * @since 1.0.0 + */ + +public class InputStreamWrapper extends InputStream { + + private InputStream is; + + public InputStreamWrapper(InputStream is) { + this.is = is; + } + + public int read() throws IOException { + throw new UnsupportedOperationException(); + } + + public int available() throws IOException { + return is.available(); + } + + public void close() throws IOException { + is.close(); + } + + public int read(byte[] b, int off, int len) throws IOException { + StackTraceElement[] ste = Thread.currentThread().getStackTrace(); + for (StackTraceElement element : ste) { + if ("flush".equals(element.getMethodName())) { + return is.read(b, off, len); + } + } + throw new UnsupportedOperationException(); + } + + public int read(byte[] b) throws IOException { + throw new UnsupportedOperationException(); + } + +} Index: openjpa-persistence-jdbc/pom.xml =================================================================== --- openjpa-persistence-jdbc/pom.xml (revision 567726) +++ openjpa-persistence-jdbc/pom.xml (working copy) @@ -135,7 +135,7 @@ mysql mysql-connector-java - 5.0.4 + 5.0.5 test