Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-13534

Implement Apache Arrow serializer for Spark DataFrame for use in DataFrame.toPandas

    Details

    • Type: New Feature
    • Status: In Progress
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 2.1.0
    • Fix Version/s: None
    • Component/s: PySpark
    • Labels:
      None

      Description

      The current code path for accessing Spark DataFrame data in Python using PySpark passes through an inefficient serialization-deserialiation process that I've examined at a high level here: https://gist.github.com/wesm/0cb5531b1c2e346a0007. Currently, RDD[Row] objects are being deserialized in pure Python as a list of tuples, which are then converted to pandas.DataFrame using its from_records alternate constructor. This also uses a large amount of memory.

      For flat (no nested types) schemas, the Apache Arrow memory layout (https://github.com/apache/arrow/tree/master/format) can be deserialized to pandas.DataFrame objects with comparatively small overhead compared with memcpy / system memory bandwidth – Arrow's bitmasks must be examined, replacing the corresponding null values with pandas's sentinel values (None or NaN as appropriate).

      I will be contributing patches to Arrow in the coming weeks for converting between Arrow and pandas in the general case, so if Spark can send Arrow memory to PySpark, we will hopefully be able to increase the Python data access throughput by an order of magnitude or more. I propose to add an new serializer for Spark DataFrame and a new method that can be invoked from PySpark to request a Arrow memory-layout byte stream, prefixed by a data header indicating array buffer offsets and sizes.

      1. benchmark.py
        2 kB
        Bryan Cutler

        Issue Links

          Activity

          Hide
          srowen Sean Owen added a comment -

          Wes McKinney there's already https://issues.apache.org/jira/browse/SPARK-13391 – is this meaningfully different? the other may be too broad to be useful and we can close it.

          Show
          srowen Sean Owen added a comment - Wes McKinney there's already https://issues.apache.org/jira/browse/SPARK-13391 – is this meaningfully different? the other may be too broad to be useful and we can close it.
          Hide
          wesmckinn Wes McKinney added a comment -

          SPARK-13391 would need to have its scope more narrowly defined. Perhaps we can modify that issue scope to encompass UDF evaluation using Arrow for data interchange? Otherwise we can close it.

          This issue is a first step in that direction, but UDF evaluation is not in scope for this issue (this is only about raw table data movement).

          Show
          wesmckinn Wes McKinney added a comment - SPARK-13391 would need to have its scope more narrowly defined. Perhaps we can modify that issue scope to encompass UDF evaluation using Arrow for data interchange? Otherwise we can close it. This issue is a first step in that direction, but UDF evaluation is not in scope for this issue (this is only about raw table data movement).
          Hide
          freiss Frederick Reiss added a comment -

          Wes McKinney, are you planning to work on this issue soon?

          Show
          freiss Frederick Reiss added a comment - Wes McKinney , are you planning to work on this issue soon?
          Hide
          holdenk holdenk added a comment -

          For people following along arrow is in the middle of voting on its next release, while its likely not yet at the point where we can start using it will be good for those interested (like myself) to take a look once the release is ready

          Show
          holdenk holdenk added a comment - For people following along arrow is in the middle of voting on its next release, while its likely not yet at the point where we can start using it will be good for those interested (like myself) to take a look once the release is ready
          Hide
          holdenk holdenk added a comment -

          And now they have a release I'm not certain its at the stage where we can use it - but I'll do some poking over the next few weeks

          Show
          holdenk holdenk added a comment - And now they have a release I'm not certain its at the stage where we can use it - but I'll do some poking over the next few weeks
          Hide
          freiss Frederick Reiss added a comment -

          We (Bryan Cutler, holdenk, Xusen Yin, and myself) are looking into this.
          Here's a rough outline of the current planned approach:

          • Add a dependency on Arrow 0.1's Java and Scala APIs to Spark.
          • Add a new developer API method to Dataset, collectAsArrow(), that returns an array of byte arrays, where each byte array contains a block of records in Arrow format. The conversion to Arrow will be a streamlined version of the Parquet conversion in ParquetWriteSupport (minus all the callbacks and levels of indirection). Conversion of complex types (Struct, Array, Map) to Arrow will not be supported in this version.
          • modify Pyspark's DataFrame.toPandas method to use the following logic:
            if (the schema of the DataFrame does not contain complex types)
                Call collectAsArrow() on the underlying Scala Dataset.
                Pull the resulting buffers of Arrow data over to the Python process.
                Use Arrow's Python APIs to convert the buffers into a single Pandas dataframe.
            else
                Use the existing code as a slow-path conversion.
            
          Show
          freiss Frederick Reiss added a comment - We ( Bryan Cutler , holdenk , Xusen Yin , and myself) are looking into this. Here's a rough outline of the current planned approach: Add a dependency on Arrow 0.1's Java and Scala APIs to Spark. Add a new developer API method to Dataset, collectAsArrow() , that returns an array of byte arrays, where each byte array contains a block of records in Arrow format. The conversion to Arrow will be a streamlined version of the Parquet conversion in ParquetWriteSupport (minus all the callbacks and levels of indirection). Conversion of complex types (Struct, Array, Map) to Arrow will not be supported in this version. modify Pyspark's DataFrame.toPandas method to use the following logic: if (the schema of the DataFrame does not contain complex types) Call collectAsArrow() on the underlying Scala Dataset. Pull the resulting buffers of Arrow data over to the Python process. Use Arrow's Python APIs to convert the buffers into a single Pandas dataframe. else Use the existing code as a slow-path conversion.
          Hide
          bryanc Bryan Cutler added a comment -

          I've been working on this with Xusen Yin. We have a very rough WIP branch that I'll link here in case others want to pitch in or review while we are working out the kinks.

          Show
          bryanc Bryan Cutler added a comment - I've been working on this with Xusen Yin . We have a very rough WIP branch that I'll link here in case others want to pitch in or review while we are working out the kinks.
          Hide
          apachespark Apache Spark added a comment -

          User 'BryanCutler' has created a pull request for this issue:
          https://github.com/apache/spark/pull/15821

          Show
          apachespark Apache Spark added a comment - User 'BryanCutler' has created a pull request for this issue: https://github.com/apache/spark/pull/15821
          Hide
          icexelloss Li Jin added a comment -

          Bryan Cutler,

          Allow me to introduce myself. I am Li Jin and I am working with Wes McKinney on pyspark/arrow. I have been looking this issue recently and don't want to duplicate effort. I think I can help by writing unit tests to validate the arrow record batch created. What do you think?

          Show
          icexelloss Li Jin added a comment - Bryan Cutler , Allow me to introduce myself. I am Li Jin and I am working with Wes McKinney on pyspark/arrow. I have been looking this issue recently and don't want to duplicate effort. I think I can help by writing unit tests to validate the arrow record batch created. What do you think?
          Hide
          bryanc Bryan Cutler added a comment -

          Hi Li Jin, that sounds great! We could definitely use some help for validation testing.

          Show
          bryanc Bryan Cutler added a comment - Hi Li Jin , that sounds great! We could definitely use some help for validation testing.
          Hide
          bryanc Bryan Cutler added a comment -

          Script for benchmarks

          Show
          bryanc Bryan Cutler added a comment - Script for benchmarks
          Hide
          jnadeau Jacques Nadeau added a comment -

          Anybody know some committers we can get to look at this?

          Show
          jnadeau Jacques Nadeau added a comment - Anybody know some committers we can get to look at this?
          Hide
          holdenk holdenk added a comment -

          So I'm following along with the progress on this, I'll try and take a more thorough look this Thursday.

          Show
          holdenk holdenk added a comment - So I'm following along with the progress on this, I'll try and take a more thorough look this Thursday.
          Hide
          jnadeau Jacques Nadeau added a comment -

          Great, thanks holdenk!

          Show
          jnadeau Jacques Nadeau added a comment - Great, thanks holdenk !

            People

            • Assignee:
              Unassigned
              Reporter:
              wesmckinn Wes McKinney
              Shepherd:
              Reynold Xin
            • Votes:
              5 Vote for this issue
              Watchers:
              65 Start watching this issue

              Dates

              • Created:
                Updated:

                Development