Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-15426 Spark 2.0 SQL API audit
  3. SPARK-15598

Change Aggregator.zero to Aggregator.init

    XMLWordPrintableJSON

Details

    • Sub-task
    • Status: Closed
    • Major
    • Resolution: Won't Fix
    • None
    • None
    • SQL
    • None

    Description

      org.apache.spark.sql.expressions.Aggregator currently requires defining the zero value for an aggregator. This is actually a limitation making it difficult to implement APIs such as reduce. In reduce (or reduceByKey), a single associative and commutative reduce function is specified by the user, and there is no definition of zero value.

      A small tweak to the API is to change zero to init, taking an input, similar to the following:

      abstract class Aggregator[-IN, BUF, OUT] extends Serializable {
        def init(a: IN): BUF
        def reduce(b: BUF, a: IN): BUF
        def merge(b1: BUF, b2: BUF): BUF
        def finish(reduction: BUF): OUT
      }
      

      Then reduce can be implemented using:

      f: (T, T) => T
      
      new Aggregator[T, T, T] {
        override def init(a: T): T = identify
        override def reduce(b: T, a: T): T = f(b, a)
        override def merge(b1: T, b2: T): T = f(b1, b2)
        override def finish(reduction: T): T = identify
      }
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            rxin Reynold Xin
            Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: