Arrow is becoming an standard interchange format for columnar Structured Data. This is already true in Spark with the use of arrow in the pandas udf functions in the dataframe API.
However the current implementation of arrow in spark is limited to two use cases.
- Pandas UDF that allows for operations on one or more columns in the DataFrame API.
- Collect as Pandas which pulls back the entire dataset to the driver in a Pandas Dataframe.
What is still hard however is making use of all of the columns in a Dataframe while staying distributed across the workers. The only way to do this currently is to drop down into RDDs and collect the rows into a dataframe. However pickling is very slow and the collecting is expensive.
The proposal is to extend spark in a way that allows users to operate on an Arrow Table fully while still making use of Spark's underlying technology. Some examples of possibilities with this new API.
- Pass the Arrow Table with Zero Copy to PyTorch for predictions.
- Pass to Nvidia Rapids for an algorithm to be run on the GPU.
- Distribute data across many GPUs making use of the new Barriers API.
ML, Data Scientists, and future library authors..
- Conversion from any Dataset[Row] or PySpark Dataframe to RDD[Table]
- Conversion back from any RDD[Table] to Dataset[Row], RDD[Row], Pyspark Dataframe
- Open the possibilities to tighter integration between Arrow/Pandas/Spark especially at a library level.
- Not creating a new API but instead using existing APIs.
case class ArrowTable(schema: Schema, batches: Iterable[ArrowRecordBatch])
As mentioned in the first section, the goal is to make it easier for Spark users to interact with Arrow tools and libraries. This however does come with some considerations from a Spark perspective.
Arrow is column based instead of Row based. In the above API proposal of RDD[ArrowTable] each RDD row will in fact be a block of data. Another proposal in this regard is to introduce a new parameter to Spark called arrow.sql.execution.arrow.maxRecordsPerTable. The goal of this parameter is to decide how many records are included in a single Arrow Table. If set to -1 the entire partition will be included in the table else to that number. Within that number the normal blocking mechanisms of Arrow is used to include multiple batches. This is still dictated by arrowMaxRecordsPerBatch.