Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-6483

Spark SQL udf(ScalaUdf) is very slow

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 1.3.0, 1.4.0
    • 1.4.0
    • SQL
    • None
    • 1. Spark version is 1.3.0
      2. 3 node per 80G/20C
      3. read 250G parquet files from hdfs

    Description

      Test case:
      1.
      register "floor" func with command:
      sqlContext.udf.register("floor", (ts: Int) => ts - ts % 300),
      then run with sql "select chan, floor(ts) as tt, sum(size) from qlogbase3 group by chan, floor(ts)",

      it takes 17 minutes.

      == Physical Plan ==
      Aggregate false, chan#23015,PartialGroup#23500, chan#23015,PartialGroup#23500 AS tt#23494,CombineSum(PartialSum#23499L) AS c2#23495L
      Exchange (HashPartitioning chan#23015,PartialGroup#23500, 54)
      Aggregate true, chan#23015,scalaUDF(ts#23016), chan#23015,*scalaUDF*(ts#23016) AS PartialGroup#23500,SUM(size#23023L) AS PartialSum#23499L
      PhysicalRDD chan#23015,ts#23016,size#23023L, MapPartitionsRDD[115] at map at newParquet.scala:562

      2.
      run with sql "select chan, (ts - ts % 300) as tt, sum(size) from qlogbase3 group by chan, (ts - ts % 300)",

      it takes only 5 minutes.

      == Physical Plan ==
      Aggregate false, chan#23015,PartialGroup#23349, chan#23015,PartialGroup#23349 AS tt#23343,CombineSum(PartialSum#23348L) AS c2#23344L
      Exchange (HashPartitioning chan#23015,PartialGroup#23349, 54)
      Aggregate true, chan#23015,(ts#23016 - (ts#23016 % 300)), chan#23015,*(ts#23016 - (ts#23016 % 300))* AS PartialGroup#23349,SUM(size#23023L) AS PartialSum#23348L
      PhysicalRDD chan#23015,ts#23016,size#23023L, MapPartitionsRDD[83] at map at newParquet.scala:562

      3.
      use HiveContext with sql "select chan, floor((ts - ts % 300)) as tt, sum(size) from qlogbase3 group by chan, floor((ts - ts % 300))"

      *it takes only 5 minutes too. *

      == Physical Plan ==
      Aggregate false, chan#23015,PartialGroup#23108L, chan#23015,PartialGroup#23108L AS tt#23102L,CombineSum(PartialSum#23107L) AS _c2#23103L
      Exchange (HashPartitioning chan#23015,PartialGroup#23108L, 54)
      Aggregate true, chan#23015,HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor((ts#23016 - (ts#23016 % 300))), chan#23015,*HiveGenericUdf*#org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor((ts#23016 - (ts#23016 % 300))) AS PartialGroup#23108L,SUM(size#23023L) AS PartialSum#23107L
      PhysicalRDD chan#23015,ts#23016,size#23023L, MapPartitionsRDD[28] at map at newParquet.scala:562

      Why? ScalaUdf is so slow?? How to improve it?

      Attachments

        Activity

          People

            zzcclp Zhichao Zhang
            zzcclp Zhichao Zhang
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: