Uploaded image for project: 'Apache Arrow'
  1. Apache Arrow
  2. ARROW-16705

[JavaScript] TypeError: RecordBatchReader.from(...).toNodeStream is not a function

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Fixed
    • 8.0.0
    • None
    • JavaScript
    • Nodejs v16.13.0

    Description

      Trying to code a real-time stream from an async iterable of objects to an IPC Streaming format file I'm getting a TypeError.

      The idea is to stream every message to the arrow file as soon as it arrives without waiting to build the complete table to stream it. To take advantage of the stream event handling, I'm using the functional approach of node:stream module (Nodejs v16.13.0).

      The async iterable contains messages that are JS objects containing different data types, for example:

      {
          id: '6345',
          product: 'foo',
          price: 62.78,
          created: '2022-05-01T16:01:00.105Z',
      }

      Code to replicate the error:

      const {
          Struct, Field, Utf8, Float32, TimestampMillisecond,
          RecordBatchReader, RecordBatchStreamWriter,
          builderThroughAsyncIterable,
      } = require('apache-arrow')
      const fs = require("fs");
      const path = require("path");
      const {pipeline} = require('node:stream');
      
      const asyncIterable = {
          [Symbol.asyncIterator]: async function* () {
              while (true) {
                  const obj = {
                      id: Math.floor(Math.random() * 10).toString(),
                      product: 'foo',
                      price: Math.random() + Math.floor(Math.random() * 10),
                      created: new Date(),
                  }
                  yield obj;
                  // insert some asynchrony
                  await new Promise((r) => setTimeout(r, 1000));
              }
          }
      }
      
      async function streamToArrow(messagesAsyncIterable) {
          const message_type = new Struct([
              new Field('id', new Utf8, false),
              new Field('product', new Utf8, false),
              new Field('price', new Float32, false),
              new Field('created', new TimestampMillisecond, false),
          ]);
      
          const builderOptions = {
              type: message_type,
              nullValues: [null, 'n/a', undefined],
              highWaterMark: 30,
              queueingStrategy: 'count',
          };
      
          const transform = builderThroughAsyncIterable(builderOptions);  
          let file_path = './ipc_stream.arrow';
          const fsWriter = fs.createWriteStream(file_path);
      
          pipeline(
              RecordBatchReader
                  .from(transform(messagesAsyncIterable))
                  .toNodeStream(),  // Throws TypeError: RecordBatchReader.from(...).toNodeStream is not a function         
              RecordBatchStreamWriter.throughNode(),
              fsWriter,
              (err, value) => {
                  if (err) {
                      console.error(err);
                  } else {
                      console.log(value, 'value returned');
                  }
              }
          ).on('close', () => {
              console.log('Closed pipeline')
          });
      
      }
      
      streamToArrow(asyncIterable)

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            vic-bonilla Victor Bonilla
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: