Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-37227

DataFrame.mapInArrow

    XMLWordPrintableJSON

Details

    • Umbrella
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.3.0
    • None
    • PySpark, SQL
    • None

    Description

      Background

      Usability problem

      In Scala APIs, we have added Arrow integration and developer APIs in Apache Spark, for example, at ArrowConverters but this is again inconvenient to use out of the box.

      In PySpark, in order to use Arrow format to connect to other external systems, they should manually convert pandas DataFrame in pandas UDF to the Arrow batch, which is inconvenient.

      Technical problem

      pandas UDFs are used in other use cases too. For example, they convert it back to
      Arrow batch, and integrate with other systems, see also https://github.com/apache/spark/pull/26783#issue-534127514.
      However, it doesn’t vectorize operations because pandas doesn’t support nested structure
      natively, and the performance impact seems non-trivial.

      In addition, it requires virtually copying during the conversion between pandas and Arrow format that consumes computation (Spark internal format -> Arrow format -> pandas DataFrame -> Arrow format). See https://github.com/apache/spark/pull/26783#issue-534127514 for performance impact.

      Other notes:

      See also:

      Proposal

      I would like to propose an API DataFrame.mapInArrow like DataFrame.mapInPandas, and RDD.mapPartitions.

      The API shape would look like:

      Scala:

      def mapInArrow(
          f: Iterator[ArrowRecordBatch] => Iterator[ArrowRecordBatch],
          schema: StructType): DataFrame = {
        // ...
      }
      
      df.mapInArrow(_.map { case arrowBatch: ArrowRecordBatch =>
        // do something with `ArrowRecordBatch` and create new `ArrowRecordBatch`.
        // ...
        arrowBatch
      }, df.schema).show()
      

      Python:

      def mapInArrow(
              self,
              func: Callable[Iterator[pyarrow.RecordBatch], Iterator[pyarrow.RecordBatch]],
              schema: StructType) -> DataFrame:
          # ...
      
      def do_something(iterator):
          for arrow_batch in iterator:
              # do something with `pyarrow.RecordBatch` and create new `pyarrow.RecordBatch`.
              # ...
              yield arrow_batch
      
      df.mapInPandas(do_something, df.schema).show()
      

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              gurwls223 Hyukjin Kwon
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated: