Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Not A Problem
-
None
-
None
-
None
Description
I'm working on a project to shuttle data back and forth from a java and c++ process. I'm able to read the java stream object in C+. However, I'm unable to read the C+ stream object in my Java project. When I do, I get a problem that I can't deserialize the message.
Exception in thread "main" java.lang.IllegalArgumentExceptionException in thread "main" java.lang.IllegalArgumentException at java.nio.ByteBuffer.allocate(ByteBuffer.java:334) at org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:543) at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$3.readNextBatch(ArrowConverters.scala:243) at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$3.<init>(ArrowConverters.scala:229) at org.apache.spark.sql.execution.arrow.ArrowConverters$.getBatchesFromStream(ArrowConverters.scala:228) at org.apache.spark.sql.execution.arrow.ArrowConverters$$anonfun$readArrowStreamFromFile$2.apply(ArrowConverters.scala:216) at org.apache.spark.sql.execution.arrow.ArrowConverters$$anonfun$readArrowStreamFromFile$2.apply(ArrowConverters.scala:214) at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2543) at org.apache.spark.sql.execution.arrow.ArrowConverters$.readArrowStreamFromFile(ArrowConverters.scala:214) at org.apache.spark.sql.execution.arrow.MyArrowReader.rawRead(MyArrowReader.scala:23) at com.adobe.compute.PlatformGet$.main(PlatformGet.scala:121) at com.adobe.compute.PlatformGet.main(PlatformGet.scala)
I checked out the code in MessageSerializer.java and it looks like it is expecting the read size to be created in the first four bytes of the file. Those four bytes are [-1, -1, -1, -1] in the C++ file, but in the Java file they are not. I do not know much about the internals of the Arrow streaming file format, so I can't say what it is supposed to be.
I'm able to read the streaming file created in Java back in java (so Java is compatible with itself). The C++ code is able to read either the Java streaming file or the C++ Streaming file. Both work.
Here's my C++ project to read and create a streaming object of its own.
arrow::Status Table__from_RecordBatchStreamReader( const std::shared_ptr<arrow::ipc::RecordBatchReader>& reader, std::shared_ptr<::arrow::Table>* table) { std::vector<std::shared_ptr<arrow::RecordBatch>> batches; shared_ptr<arrow::RecordBatch> cur; do { ARROW_RETURN_NOT_OK(reader->ReadNext(&cur)); if (cur) batches.push_back(cur); } while (cur); return arrow::Table::FromRecordBatches(batches, table); } #define EXIT_ON_FAILURE(expr) \ do { \ arrow::Status status_ = (expr); \ if (!status_.ok()) { \ std::cerr << status_.message() << std::endl; \ return EXIT_FAILURE; \ } \ } while (0);void checkStatus(arrow::Status st) { if (st.ok()) return; std::cout << st.ToString() << std::endl; }int main(int , char** argv) { (void)argv; std::shared_ptr<arrow::io::InputStream> infile = *arrow::io::MemoryMappedFile::Open("/tmp/java-arrow-stream.arrow", arrow::io::FileMode::READ); std::shared_ptr<arrow::ipc::RecordBatchReader> reader; checkStatus( arrow::ipc::RecordBatchStreamReader::Open(infile, &reader)); std::shared_ptr<arrow::Table> table; checkStatus(Table__from_RecordBatchStreamReader(reader, &table)); auto schema = reader->schema(); shared_ptr<arrow::io::OutputStream> outputStream; outputStream = *arrow::io::FileOutputStream::Open( "/tmp/cpparrow-stream.arrow"); shared_ptr<arrow::ipc::RecordBatchWriter> recordBatchStreamWriter; checkStatus(arrow::ipc::RecordBatchStreamWriter::Open( outputStream.get(), schema, &recordBatchStreamWriter)); checkStatus(recordBatchStreamWriter->WriteTable(*table)); return 0; }