Details

    • Type: Sub-task
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 2.1.0
    • Fix Version/s: 2.3.0
    • Component/s: PySpark
    • Labels:
      None

      Description

      The current code path for accessing Spark DataFrame data in Python using PySpark passes through an inefficient serialization-deserialiation process that I've examined at a high level here: https://gist.github.com/wesm/0cb5531b1c2e346a0007. Currently, RDD[Row] objects are being deserialized in pure Python as a list of tuples, which are then converted to pandas.DataFrame using its from_records alternate constructor. This also uses a large amount of memory.

      For flat (no nested types) schemas, the Apache Arrow memory layout (https://github.com/apache/arrow/tree/master/format) can be deserialized to pandas.DataFrame objects with comparatively small overhead compared with memcpy / system memory bandwidth – Arrow's bitmasks must be examined, replacing the corresponding null values with pandas's sentinel values (None or NaN as appropriate).

      I will be contributing patches to Arrow in the coming weeks for converting between Arrow and pandas in the general case, so if Spark can send Arrow memory to PySpark, we will hopefully be able to increase the Python data access throughput by an order of magnitude or more. I propose to add an new serializer for Spark DataFrame and a new method that can be invoked from PySpark to request a Arrow memory-layout byte stream, prefixed by a data header indicating array buffer offsets and sizes.

      1. benchmark.py
        2 kB
        Bryan Cutler

        Issue Links

          Activity

          Hide
          srowen Sean Owen added a comment -

          Wes McKinney there's already https://issues.apache.org/jira/browse/SPARK-13391 – is this meaningfully different? the other may be too broad to be useful and we can close it.

          Show
          srowen Sean Owen added a comment - Wes McKinney there's already https://issues.apache.org/jira/browse/SPARK-13391 – is this meaningfully different? the other may be too broad to be useful and we can close it.
          Hide
          wesmckinn Wes McKinney added a comment -

          SPARK-13391 would need to have its scope more narrowly defined. Perhaps we can modify that issue scope to encompass UDF evaluation using Arrow for data interchange? Otherwise we can close it.

          This issue is a first step in that direction, but UDF evaluation is not in scope for this issue (this is only about raw table data movement).

          Show
          wesmckinn Wes McKinney added a comment - SPARK-13391 would need to have its scope more narrowly defined. Perhaps we can modify that issue scope to encompass UDF evaluation using Arrow for data interchange? Otherwise we can close it. This issue is a first step in that direction, but UDF evaluation is not in scope for this issue (this is only about raw table data movement).
          Hide
          freiss Frederick Reiss added a comment -

          Wes McKinney, are you planning to work on this issue soon?

          Show
          freiss Frederick Reiss added a comment - Wes McKinney , are you planning to work on this issue soon?
          Hide
          holdenk holdenk added a comment -

          For people following along arrow is in the middle of voting on its next release, while its likely not yet at the point where we can start using it will be good for those interested (like myself) to take a look once the release is ready

          Show
          holdenk holdenk added a comment - For people following along arrow is in the middle of voting on its next release, while its likely not yet at the point where we can start using it will be good for those interested (like myself) to take a look once the release is ready
          Hide
          holdenk holdenk added a comment -

          And now they have a release I'm not certain its at the stage where we can use it - but I'll do some poking over the next few weeks

          Show
          holdenk holdenk added a comment - And now they have a release I'm not certain its at the stage where we can use it - but I'll do some poking over the next few weeks
          Hide
          freiss Frederick Reiss added a comment -

          We (Bryan Cutler, holdenk, Xusen Yin, and myself) are looking into this.
          Here's a rough outline of the current planned approach:

          • Add a dependency on Arrow 0.1's Java and Scala APIs to Spark.
          • Add a new developer API method to Dataset, collectAsArrow(), that returns an array of byte arrays, where each byte array contains a block of records in Arrow format. The conversion to Arrow will be a streamlined version of the Parquet conversion in ParquetWriteSupport (minus all the callbacks and levels of indirection). Conversion of complex types (Struct, Array, Map) to Arrow will not be supported in this version.
          • modify Pyspark's DataFrame.toPandas method to use the following logic:
            if (the schema of the DataFrame does not contain complex types)
                Call collectAsArrow() on the underlying Scala Dataset.
                Pull the resulting buffers of Arrow data over to the Python process.
                Use Arrow's Python APIs to convert the buffers into a single Pandas dataframe.
            else
                Use the existing code as a slow-path conversion.
            
          Show
          freiss Frederick Reiss added a comment - We ( Bryan Cutler , holdenk , Xusen Yin , and myself) are looking into this. Here's a rough outline of the current planned approach: Add a dependency on Arrow 0.1's Java and Scala APIs to Spark. Add a new developer API method to Dataset, collectAsArrow() , that returns an array of byte arrays, where each byte array contains a block of records in Arrow format. The conversion to Arrow will be a streamlined version of the Parquet conversion in ParquetWriteSupport (minus all the callbacks and levels of indirection). Conversion of complex types (Struct, Array, Map) to Arrow will not be supported in this version. modify Pyspark's DataFrame.toPandas method to use the following logic: if (the schema of the DataFrame does not contain complex types) Call collectAsArrow() on the underlying Scala Dataset. Pull the resulting buffers of Arrow data over to the Python process. Use Arrow's Python APIs to convert the buffers into a single Pandas dataframe. else Use the existing code as a slow-path conversion.
          Hide
          bryanc Bryan Cutler added a comment -

          I've been working on this with Xusen Yin. We have a very rough WIP branch that I'll link here in case others want to pitch in or review while we are working out the kinks.

          Show
          bryanc Bryan Cutler added a comment - I've been working on this with Xusen Yin . We have a very rough WIP branch that I'll link here in case others want to pitch in or review while we are working out the kinks.
          Hide
          apachespark Apache Spark added a comment -

          User 'BryanCutler' has created a pull request for this issue:
          https://github.com/apache/spark/pull/15821

          Show
          apachespark Apache Spark added a comment - User 'BryanCutler' has created a pull request for this issue: https://github.com/apache/spark/pull/15821
          Hide
          icexelloss Li Jin added a comment -

          Bryan Cutler,

          Allow me to introduce myself. I am Li Jin and I am working with Wes McKinney on pyspark/arrow. I have been looking this issue recently and don't want to duplicate effort. I think I can help by writing unit tests to validate the arrow record batch created. What do you think?

          Show
          icexelloss Li Jin added a comment - Bryan Cutler , Allow me to introduce myself. I am Li Jin and I am working with Wes McKinney on pyspark/arrow. I have been looking this issue recently and don't want to duplicate effort. I think I can help by writing unit tests to validate the arrow record batch created. What do you think?
          Hide
          bryanc Bryan Cutler added a comment -

          Hi Li Jin, that sounds great! We could definitely use some help for validation testing.

          Show
          bryanc Bryan Cutler added a comment - Hi Li Jin , that sounds great! We could definitely use some help for validation testing.
          Hide
          bryanc Bryan Cutler added a comment -

          Script for benchmarks

          Show
          bryanc Bryan Cutler added a comment - Script for benchmarks
          Hide
          jnadeau Jacques Nadeau added a comment -

          Anybody know some committers we can get to look at this?

          Show
          jnadeau Jacques Nadeau added a comment - Anybody know some committers we can get to look at this?
          Hide
          holdenk holdenk added a comment -

          So I'm following along with the progress on this, I'll try and take a more thorough look this Thursday.

          Show
          holdenk holdenk added a comment - So I'm following along with the progress on this, I'll try and take a more thorough look this Thursday.
          Hide
          jnadeau Jacques Nadeau added a comment -

          Great, thanks holdenk!

          Show
          jnadeau Jacques Nadeau added a comment - Great, thanks holdenk !
          Hide
          cloud_fan Wenchen Fan added a comment -

          Issue resolved by pull request 15821
          https://github.com/apache/spark/pull/15821

          Show
          cloud_fan Wenchen Fan added a comment - Issue resolved by pull request 15821 https://github.com/apache/spark/pull/15821
          Hide
          rxin Reynold Xin added a comment -

          Was this done? I thought there are still other data types that are not supported. We should either turn this into an umbrella ticket, or create a new umbrella ticket.

          Show
          rxin Reynold Xin added a comment - Was this done? I thought there are still other data types that are not supported. We should either turn this into an umbrella ticket, or create a new umbrella ticket.
          Hide
          bryanc Bryan Cutler added a comment -

          That is correct Reynold Xin, this did not have support for complex types or date/timestamp. I created SPARK-21187 as an umbrella to track addition of all remaining types.

          Show
          bryanc Bryan Cutler added a comment - That is correct Reynold Xin , this did not have support for complex types or date/timestamp. I created SPARK-21187 as an umbrella to track addition of all remaining types.
          Hide
          apachespark Apache Spark added a comment -

          User 'BryanCutler' has created a pull request for this issue:
          https://github.com/apache/spark/pull/18459

          Show
          apachespark Apache Spark added a comment - User 'BryanCutler' has created a pull request for this issue: https://github.com/apache/spark/pull/18459
          Hide
          jaiseban@gmail.com Jais Sebastian added a comment -

          Hi,
          Do you have any plan to integrate Arrow format for DataFrameWriter for Java API ? https://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/sql/DataFrameWriter.html

          Show
          jaiseban@gmail.com Jais Sebastian added a comment - Hi, Do you have any plan to integrate Arrow format for DataFrameWriter for Java API ? https://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/sql/DataFrameWriter.html
          Hide
          bryanc Bryan Cutler added a comment - - edited

          Hi Jais Sebastian, the DataFrameWriter API is for persisting to disk which is not the intent for Arrow since it is an in-memory format. It would be possible in the future to add an API to expose internal data from a Spark Dataset as Arrow data that could be consumed by another process.

          Show
          bryanc Bryan Cutler added a comment - - edited Hi Jais Sebastian , the DataFrameWriter API is for persisting to disk which is not the intent for Arrow since it is an in-memory format. It would be possible in the future to add an API to expose internal data from a Spark Dataset as Arrow data that could be consumed by another process.
          Hide
          Tagar Ruslan Dautkhanov added a comment -

          So Apache Arrow would currently be available only in a daraframe.toPandas() call?
          Do you have plans to extend that as a more generic pyspark serializer, like
          https://github.com/apache/spark/blob/branch-2.2/python/pyspark/serializers.py#L22
          for example

          sc = SparkContext('local', 'test', serializer=ArrowSerializer())
          
          Show
          Tagar Ruslan Dautkhanov added a comment - So Apache Arrow would currently be available only in a daraframe.toPandas() call? Do you have plans to extend that as a more generic pyspark serializer, like https://github.com/apache/spark/blob/branch-2.2/python/pyspark/serializers.py#L22 for example sc = SparkContext('local', 'test', serializer=ArrowSerializer())
          Hide
          leif Leif Walsh added a comment -

          See SPARK-21190 for a case we're considering for using arrow to move data between the executors and python workers.

          Show
          leif Leif Walsh added a comment - See SPARK-21190 for a case we're considering for using arrow to move data between the executors and python workers.
          Hide
          bryanc Bryan Cutler added a comment -

          Hi Ruslan Dautkhanov, the ArrowSerializer doesn't quite fit as a drop-in replacement because the standard PySpark serializers use iterators over elements and Arrow works on batches. Trying to iterate over the batches to get individual elements would probably cancel out any performance gains. So then you would need to operate on the data with an interface like Pandas. I proposed something similar in my comment here (some api details). I'd like to hear what your use case is for working with Arrow data and what you'd want to see in Spark to support this?

          Show
          bryanc Bryan Cutler added a comment - Hi Ruslan Dautkhanov , the ArrowSerializer doesn't quite fit as a drop-in replacement because the standard PySpark serializers use iterators over elements and Arrow works on batches. Trying to iterate over the batches to get individual elements would probably cancel out any performance gains. So then you would need to operate on the data with an interface like Pandas. I proposed something similar in my comment here (some api details ). I'd like to hear what your use case is for working with Arrow data and what you'd want to see in Spark to support this?
          Hide
          Tagar Ruslan Dautkhanov added a comment -

          Bryan Cutler, thanks for the feedback. We sometimes have issues with caching dataframes in Spark, so we wanted to see if Airflow could be a better fit in PySpark than Pickle / cPickle for caching dataframes?

          Thanks for the link - I will check that. On a separarate note, Arrow batched columnar storage can still be iterated over for reads? For non-batched writes PySpark serializer can fall back to non-Arrow format. So might be interesting to explore if there are two serializers can be active at the same time - batched Airflow and fall-back to cPickle if necessary?

          Show
          Tagar Ruslan Dautkhanov added a comment - Bryan Cutler , thanks for the feedback. We sometimes have issues with caching dataframes in Spark, so we wanted to see if Airflow could be a better fit in PySpark than Pickle / cPickle for caching dataframes? Thanks for the link - I will check that. On a separarate note, Arrow batched columnar storage can still be iterated over for reads? For non-batched writes PySpark serializer can fall back to non-Arrow format. So might be interesting to explore if there are two serializers can be active at the same time - batched Airflow and fall-back to cPickle if necessary?

            People

            • Assignee:
              bryanc Bryan Cutler
              Reporter:
              wesmckinn Wes McKinney
              Shepherd:
              Reynold Xin
            • Votes:
              6 Vote for this issue
              Watchers:
              76 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development