Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-18924

Improve collect/createDataFrame performance in SparkR

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Critical
    • Resolution: Duplicate
    • None
    • None
    • SparkR
    • None

    Description

      SparkR has its own SerDe for data serialization between JVM and R.

      The SerDe on the JVM side is implemented in:

      The SerDe on the R side is implemented in:

      The serialization between JVM and R suffers from huge storage and computation overhead. For example, a short round trip of 1 million doubles surprisingly took 3 minutes on my laptop:

      > system.time(collect(createDataFrame(data.frame(x=runif(1000000)))))
         user  system elapsed
       14.224   0.582 189.135
      

      Collecting a medium-sized DataFrame to local and continuing with a local R workflow is a use case we should pay attention to. SparkR will never be able to cover all existing features from CRAN packages. It is also unnecessary for Spark to do so because not all features need scalability.

      Several factors contribute to the serialization overhead:
      1. The SerDe in R side is implemented using high-level R methods.
      2. DataFrame columns are not efficiently serialized, primitive type columns in particular.
      3. Some overhead in the serialization protocol/impl.

      1) might be discussed before because R packages like rJava exist before SparkR. I'm not sure whether we have a license issue in depending on those libraries. Another option is to switch to low-level R'C interface or Rcpp, which again might have license issue. I'm not an expert here. If we have to implement our own, there still exist much space for improvement, discussed below.

      2) is a huge gap. The current collect is implemented by `SQLUtils.dfToCols`, which collects rows to local and then constructs columns. However,

      • it ignores column types and results boxing/unboxing overhead
      • it collects all objects to driver and results high GC pressure

      A relatively simple change is to implement specialized column builder based on column types, primitive types in particular. We need to handle null/NA values properly. A simple data structure we can use is

      val size: Int
      val nullIndexes: Array[Int]
      val notNullValues: Array[T] // specialized for primitive types
      

      On the R side, we can use `readBin` and `writeBin` to read the entire vector in a single method call. The speed seems reasonable (at the order of GB/s):

      > x <- runif(10000000) # 1e7, not 1e6
      > system.time(r <- writeBin(x, raw(0)))
         user  system elapsed
        0.036   0.021   0.059
      > > system.time(y <- readBin(r, double(), 10000000))
         user  system elapsed
        0.015   0.007   0.024
      

      This is just a proposal that needs to be discussed and formalized. But in general, it should be feasible to obtain 20x or more performance gain.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              mengxr Xiangrui Meng
              Votes:
              3 Vote for this issue
              Watchers:
              15 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: