Uploaded image for project: 'Apache Arrow'
  1. Apache Arrow
  2. ARROW-7383

[C++] [Java] Cannot Read In Java/Scala Streaming Arrow Files Generated In C++

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Not A Problem
    • None
    • None
    • C++, Java
    • None

    Description

      I'm working on a project to shuttle data back and forth from a java and c++ process. I'm able to read the java stream object in C+. However, I'm unable to read the C+ stream object in my Java project. When I do, I get a problem that I can't deserialize the message.

      Exception in thread "main" java.lang.IllegalArgumentExceptionException in thread "main" java.lang.IllegalArgumentException at java.nio.ByteBuffer.allocate(ByteBuffer.java:334) at org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:543) at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$3.readNextBatch(ArrowConverters.scala:243) at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$3.<init>(ArrowConverters.scala:229) at org.apache.spark.sql.execution.arrow.ArrowConverters$.getBatchesFromStream(ArrowConverters.scala:228) at org.apache.spark.sql.execution.arrow.ArrowConverters$$anonfun$readArrowStreamFromFile$2.apply(ArrowConverters.scala:216) at org.apache.spark.sql.execution.arrow.ArrowConverters$$anonfun$readArrowStreamFromFile$2.apply(ArrowConverters.scala:214) at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2543) at org.apache.spark.sql.execution.arrow.ArrowConverters$.readArrowStreamFromFile(ArrowConverters.scala:214) at org.apache.spark.sql.execution.arrow.MyArrowReader.rawRead(MyArrowReader.scala:23) at com.adobe.compute.PlatformGet$.main(PlatformGet.scala:121) at com.adobe.compute.PlatformGet.main(PlatformGet.scala) 

      I checked out the code in MessageSerializer.java and it looks like it is expecting the read size to be created in the first four bytes of the file. Those four bytes are [-1, -1, -1, -1] in the C++ file, but in the Java file they are not. I do not know much about the internals of the Arrow streaming file format, so I can't say what it is supposed to be.

      I'm able to read the streaming file created in Java back in java (so Java is compatible with itself). The C++ code is able to read either the Java streaming file or the C++ Streaming file. Both work.

      Here's my C++ project to read and create a streaming object of its own.

       

      arrow::Status Table__from_RecordBatchStreamReader(
              const std::shared_ptr<arrow::ipc::RecordBatchReader>& reader,
              std::shared_ptr<::arrow::Table>* table) {    std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
          shared_ptr<arrow::RecordBatch> cur;
          do {
              ARROW_RETURN_NOT_OK(reader->ReadNext(&cur));
              if (cur)
                  batches.push_back(cur);
          } while (cur);    return arrow::Table::FromRecordBatches(batches, table);
      }
      #define EXIT_ON_FAILURE(expr)                      \
        do {                                             \
          arrow::Status status_ = (expr);                \
          if (!status_.ok()) {                           \
            std::cerr << status_.message() << std::endl; \
            return EXIT_FAILURE;                         \
          }                                              \
        } while (0);void checkStatus(arrow::Status st) {
          if (st.ok())
              return;
          std::cout << st.ToString() << std::endl;
      }int main(int , char** argv) {
          (void)argv;
          std::shared_ptr<arrow::io::InputStream> infile
              = *arrow::io::MemoryMappedFile::Open("/tmp/java-arrow-stream.arrow", arrow::io::FileMode::READ);
          std::shared_ptr<arrow::ipc::RecordBatchReader> reader;
          checkStatus(
                  arrow::ipc::RecordBatchStreamReader::Open(infile, &reader));
          std::shared_ptr<arrow::Table> table;
          checkStatus(Table__from_RecordBatchStreamReader(reader, &table));    auto schema = reader->schema();    shared_ptr<arrow::io::OutputStream> outputStream;
          outputStream = *arrow::io::FileOutputStream::Open(
                  "/tmp/cpparrow-stream.arrow");
          shared_ptr<arrow::ipc::RecordBatchWriter> recordBatchStreamWriter;
          checkStatus(arrow::ipc::RecordBatchStreamWriter::Open(
                      outputStream.get(), schema, &recordBatchStreamWriter));
          checkStatus(recordBatchStreamWriter->WriteTable(*table));
          return 0;
      }
      

       

       

      Attachments

        1. java-arrow-stream.arrow
          3 kB
          David Wilcox
        2. cpparrow-stream.arrow
          3 kB
          David Wilcox

        Activity

          People

            Unassigned Unassigned
            davidlukewilcox David Wilcox
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: