Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
8.0.0
-
None
-
Nodejs v16.13.0
Description
Trying to code a real-time stream from an async iterable of objects to an IPC Streaming format file I'm getting a TypeError.
The idea is to stream every message to the arrow file as soon as it arrives without waiting to build the complete table to stream it. To take advantage of the stream event handling, I'm using the functional approach of node:stream module (Nodejs v16.13.0).
The async iterable contains messages that are JS objects containing different data types, for example:
{ id: '6345', product: 'foo', price: 62.78, created: '2022-05-01T16:01:00.105Z', }
Code to replicate the error:
const { Struct, Field, Utf8, Float32, TimestampMillisecond, RecordBatchReader, RecordBatchStreamWriter, builderThroughAsyncIterable, } = require('apache-arrow') const fs = require("fs"); const path = require("path"); const {pipeline} = require('node:stream'); const asyncIterable = { [Symbol.asyncIterator]: async function* () { while (true) { const obj = { id: Math.floor(Math.random() * 10).toString(), product: 'foo', price: Math.random() + Math.floor(Math.random() * 10), created: new Date(), } yield obj; // insert some asynchrony await new Promise((r) => setTimeout(r, 1000)); } } } async function streamToArrow(messagesAsyncIterable) { const message_type = new Struct([ new Field('id', new Utf8, false), new Field('product', new Utf8, false), new Field('price', new Float32, false), new Field('created', new TimestampMillisecond, false), ]); const builderOptions = { type: message_type, nullValues: [null, 'n/a', undefined], highWaterMark: 30, queueingStrategy: 'count', }; const transform = builderThroughAsyncIterable(builderOptions); let file_path = './ipc_stream.arrow'; const fsWriter = fs.createWriteStream(file_path); pipeline( RecordBatchReader .from(transform(messagesAsyncIterable)) .toNodeStream(), // Throws TypeError: RecordBatchReader.from(...).toNodeStream is not a function RecordBatchStreamWriter.throughNode(), fsWriter, (err, value) => { if (err) { console.error(err); } else { console.log(value, 'value returned'); } } ).on('close', () => { console.log('Closed pipeline') }); } streamToArrow(asyncIterable)