Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-12635

More efficient (column batch) serialization for Python/R

    Details

    • Type: New Feature
    • Status: In Progress
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: PySpark, SparkR, SQL
    • Labels:
      None

      Description

      Serialization between Scala / Python / R is pretty slow. Python and R both work pretty well with column batch interface (e.g. numpy arrays). Technically we should be able to just pass column batches around with minimal serialization (maybe even zero copy memory).

      Note that this depends on some internal refactoring to use a column batch interface in Spark SQL.

        Activity

        Hide
        shivaram Shivaram Venkataraman added a comment -

        cc Sun Rui who is working on the SparkR UDF support

        Show
        shivaram Shivaram Venkataraman added a comment - cc Sun Rui who is working on the SparkR UDF support
        Hide
        dselivanov Dmitriy Selivanov added a comment - - edited

        Hi!
        First, thanks to all SparkR and Spark developers.
        I just start to evaluate SparkR. I tried it several times (since it was in AMPLab), but before 1.6 there were too many rough edges. So I used Scala API. For now I see two main limiting issues (and they are interconnected):

        1. Lack of UDF in R interface. I saw SPARK-6817.
        2. And I think more important - lack of fast serialization / deserialization. I believe it is impossible to develop useful R UDF interface without fast serialization / deserialization.

        Consider following example. I have tiny cached spark DF with nrow=300k, ncol=25 and I want to collect it to local R session:

        df_local <- collect(df)
        

        Resulting R data.frame is only ~ 70mb!!, but it takes *120sec* to collect it to R. (compared to *7sec* of df.toPandas() in pyspark).
        I made some profiling. Almost all time is spent at this calls collect -> callJStatic -> invokeJava -> readObject. readObject make a lot of read* calls from [deserialize.R](https://github.com/apache/spark/blob/c3d505602de2fd2361633f90e4fff7e041849e28/R/pkg/R/deserialize.R).

        So for now it *much* faster to write spark data.frame to simple plain csv/json and then read it R. I didn't read python serialization. Is it diffrent from R? Why so dramatic difference between R and Python?
        cc Sun Rui

        Show
        dselivanov Dmitriy Selivanov added a comment - - edited Hi! First, thanks to all SparkR and Spark developers. I just start to evaluate SparkR. I tried it several times (since it was in AMPLab), but before 1.6 there were too many rough edges. So I used Scala API. For now I see two main limiting issues (and they are interconnected): 1. Lack of UDF in R interface. I saw SPARK-6817 . 2. And I think more important - lack of fast serialization / deserialization. I believe it is impossible to develop useful R UDF interface without fast serialization / deserialization. Consider following example. I have tiny cached spark DF with nrow=300k, ncol=25 and I want to collect it to local R session: df_local <- collect(df) Resulting R data.frame is only ~ 70mb!!, but it takes * 120sec * to collect it to R. (compared to * 7sec * of df.toPandas() in pyspark). I made some profiling. Almost all time is spent at this calls collect -> callJStatic -> invokeJava -> readObject. readObject make a lot of read* calls from [deserialize.R] ( https://github.com/apache/spark/blob/c3d505602de2fd2361633f90e4fff7e041849e28/R/pkg/R/deserialize.R ). So for now it * much * faster to write spark data.frame to simple plain csv/json and then read it R. I didn't read python serialization. Is it diffrent from R? Why so dramatic difference between R and Python? cc Sun Rui
        Hide
        sunrui Sun Rui added a comment - - edited

        Dmitriy Selivanov PySpark uses pickle and CloudPickle on python side and net.razorvine.pickle on JVM side for data serialization/deserialization between Python and JVM. While there lacks a library similar to net.razorvine.pickle which can deserialize from and serialize to R serialization format. So currently, SparkR depends on ReadBin()/writeBin() on R side and Java DataInputStream/DataOutputStream for serialization/deserialization between R and JVM, based on the fact that for simple types like integer, double, byte array, they share the same format.

        For collect(), the serialization/deserialization happens along with the communication via socket. I suspect there are much communication overhead occurring during many socket reads/writes. Maybe we can change the behavior in batch way, that is, serialize part of the collection result into a buffer in memory and transfer it back. Would you interested in doing a prototype and see if there is any performance improvement?

        Another idea would be introduce something like net.razorvine.pickle, but that sounds a lot of effort.

        Show
        sunrui Sun Rui added a comment - - edited Dmitriy Selivanov PySpark uses pickle and CloudPickle on python side and net.razorvine.pickle on JVM side for data serialization/deserialization between Python and JVM. While there lacks a library similar to net.razorvine.pickle which can deserialize from and serialize to R serialization format. So currently, SparkR depends on ReadBin()/writeBin() on R side and Java DataInputStream/DataOutputStream for serialization/deserialization between R and JVM, based on the fact that for simple types like integer, double, byte array, they share the same format. For collect(), the serialization/deserialization happens along with the communication via socket. I suspect there are much communication overhead occurring during many socket reads/writes. Maybe we can change the behavior in batch way, that is, serialize part of the collection result into a buffer in memory and transfer it back. Would you interested in doing a prototype and see if there is any performance improvement? Another idea would be introduce something like net.razorvine.pickle, but that sounds a lot of effort.
        Hide
        dselivanov Dmitriy Selivanov added a comment -

        Thanks for clarification! I want to make a try, but not sure when I will have enough time.
        I have a question. Why not just use org.rosuda.REngine.REXP class from rJava and don't create data.frame at jvm side? http://rforge.net/org/doc/
        From a brief look over sparkR code I can't understand why we reimplement binary interface between R and java. Why don't we use existing interface? (I can miss something). Also I see this 1 year old thread: https://sparkr.atlassian.net/browse/SPARKR-145, but it ends with no decision.

        Show
        dselivanov Dmitriy Selivanov added a comment - Thanks for clarification! I want to make a try, but not sure when I will have enough time. I have a question. Why not just use org.rosuda.REngine.REXP class from rJava and don't create data.frame at jvm side? http://rforge.net/org/doc/ From a brief look over sparkR code I can't understand why we reimplement binary interface between R and java. Why don't we use existing interface? (I can miss something). Also I see this 1 year old thread: https://sparkr.atlassian.net/browse/SPARKR-145 , but it ends with no decision.
        Hide
        shivaram Shivaram Venkataraman added a comment -

        Just to clarify a couple of things - we should probably move this out to a new JIRA issue.

        • The main purpose for creating the SerDe library in SparkR was to enable inter-process communication (IPC) between R and the JVM that if flexible, works on multiple platforms and works without needing too many dependencies. By IPC, I mean having the ability to call methods on the JVM from R. The reason for implementing this in Spark was that we need flexibility for either R or the JVM to come up first (as opposed to an embedded JVM) and also to make installing / deploying Spark easier.
        • Using the same SerDe mechanism for collect is just a natural extension and as Spark is primarily tuned to do distributed operation we haven't profiled / benchmarked the collect performance so far. So your benchmarks are very useful and provide a baseline that we can improve on.
        • In terms of future improvements I see two things (a) better benchmarks, profiling of the serialization costs – we will also need to do this for the UDF work as we will be similarly transferring data from JVM to R and back there (b) designing or using a faster serialization for batch transfers like collect, UDFs.
        Show
        shivaram Shivaram Venkataraman added a comment - Just to clarify a couple of things - we should probably move this out to a new JIRA issue. The main purpose for creating the SerDe library in SparkR was to enable inter-process communication (IPC) between R and the JVM that if flexible, works on multiple platforms and works without needing too many dependencies. By IPC, I mean having the ability to call methods on the JVM from R. The reason for implementing this in Spark was that we need flexibility for either R or the JVM to come up first (as opposed to an embedded JVM) and also to make installing / deploying Spark easier. Using the same SerDe mechanism for collect is just a natural extension and as Spark is primarily tuned to do distributed operation we haven't profiled / benchmarked the collect performance so far. So your benchmarks are very useful and provide a baseline that we can improve on. In terms of future improvements I see two things (a) better benchmarks, profiling of the serialization costs – we will also need to do this for the UDF work as we will be similarly transferring data from JVM to R and back there (b) designing or using a faster serialization for batch transfers like collect, UDFs.

          People

          • Assignee:
            Unassigned
            Reporter:
            rxin Reynold Xin
          • Votes:
            4 Vote for this issue
            Watchers:
            20 Start watching this issue

            Dates

            • Created:
              Updated:

              Development