Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-18591

Replace hash-based aggregates with sort-based ones if inputs already sorted

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • 2.0.2
    • None
    • SQL

    Description

      Spark currently uses sort-based aggregates only in limited condition; the cases where spark cannot use partial aggregates and hash-based ones.
      However, if input ordering has already satisfied the requirements of sort-based aggregates, it seems sort-based ones are faster than the other.

      ./bin/spark-shell --conf spark.sql.shuffle.partitions=1
      
      val df = spark.range(10000000).selectExpr("id AS key", "id % 10 AS value").sort($"key").cache
      
      def timer[R](block: => R): R = {
        val t0 = System.nanoTime()
        val result = block
        val t1 = System.nanoTime()
        println("Elapsed time: " + ((t1 - t0 + 0.0) / 1000000000.0)+ "s")
        result
      }
      
      timer {
        df.groupBy("key").count().count
      }
      
      // codegen'd hash aggregate
      Elapsed time: 7.116962977s
      
      // non-codegen'd sort aggregarte
      Elapsed time: 3.088816662s
      

      If codegen'd sort-based aggregates are supported in SPARK-16844, this seems to make the performance gap bigger;

      - codegen'd sort aggregate
      Elapsed time: 1.645234684s
      

      Therefore, it'd be better to use sort-based ones in this case.

      Attachments

        Activity

          People

            Unassigned Unassigned
            maropu Takeshi Yamamuro
            Votes:
            1 Vote for this issue
            Watchers:
            15 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: