Details

    • Type: Sub-task
    • Status: Resolved
    • Priority: Critical
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.6.0
    • Component/s: SQL
    • Labels:
      None

      Description

        Issue Links

          Activity

          Hide
          apachespark Apache Spark added a comment -

          User 'nburoojy' has created a pull request for this issue:
          https://github.com/apache/spark/pull/8592

          Show
          apachespark Apache Spark added a comment - User 'nburoojy' has created a pull request for this issue: https://github.com/apache/spark/pull/8592
          Hide
          nburoojy Nick Buroojy added a comment -

          I sent a pull request to add these aggregates on the new api; however, I now see that this may be blocked by SPARK-9830 (https://issues.apache.org/jira/browse/SPARK-4366?focusedCommentId=14728451&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14728451).

          Let me know if the next step on this is to wait for the blocking change.

          Show
          nburoojy Nick Buroojy added a comment - I sent a pull request to add these aggregates on the new api; however, I now see that this may be blocked by SPARK-9830 ( https://issues.apache.org/jira/browse/SPARK-4366?focusedCommentId=14728451&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14728451 ). Let me know if the next step on this is to wait for the blocking change.
          Hide
          apachespark Apache Spark added a comment -

          User 'nburoojy' has created a pull request for this issue:
          https://github.com/apache/spark/pull/9526

          Show
          apachespark Apache Spark added a comment - User 'nburoojy' has created a pull request for this issue: https://github.com/apache/spark/pull/9526
          Hide
          marmbrus Michael Armbrust added a comment -

          Issue resolved by pull request 9526
          https://github.com/apache/spark/pull/9526

          Show
          marmbrus Michael Armbrust added a comment - Issue resolved by pull request 9526 https://github.com/apache/spark/pull/9526
          Hide
          justin.uang Justin Uang added a comment -

          Do we have a plan on how to implement these in native spark sql? I imagine that this code will have terrible performance implications, since every time we do update(), we're probably doing a full copy of the array/seq.

          class MyUDAF extends UserDefinedAggregateFunction {
            override def inputSchema: StructType = StructType(List(StructField("input", StringType)))
          
            override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
              buffer.update(0, input.get(0) +: buffer.getSeq(0))
            }
          
            override def bufferSchema: StructType = StructType(List(StructField("list", ArrayType(StringType))))
          
            override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
              buffer1.update(0, buffer1.getSeq(0) ++ buffer2.getSeq(0))
            }
          
            override def initialize(buffer: MutableAggregationBuffer): Unit = {
              buffer.update(0, Array())
            }
          
            override def deterministic: Boolean = true
          
            override def evaluate(buffer: Row): Any = {
              buffer.get(0)
            }
          
            override def dataType: DataType = ArrayType(StringType)
          }
          
          Show
          justin.uang Justin Uang added a comment - Do we have a plan on how to implement these in native spark sql? I imagine that this code will have terrible performance implications, since every time we do update(), we're probably doing a full copy of the array/seq. class MyUDAF extends UserDefinedAggregateFunction { override def inputSchema: StructType = StructType(List(StructField( "input" , StringType))) override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { buffer.update(0, input.get(0) +: buffer.getSeq(0)) } override def bufferSchema: StructType = StructType(List(StructField( "list" , ArrayType(StringType)))) override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { buffer1.update(0, buffer1.getSeq(0) ++ buffer2.getSeq(0)) } override def initialize(buffer: MutableAggregationBuffer): Unit = { buffer.update(0, Array()) } override def deterministic: Boolean = true override def evaluate(buffer: Row): Any = { buffer.get(0) } override def dataType: DataType = ArrayType(StringType) }
          Hide
          maver1ck Maciej Bryński added a comment -

          Moreover version from Hive doesn't work with struct types.
          https://issues.apache.org/jira/browse/SPARK-10605

          Show
          maver1ck Maciej Bryński added a comment - Moreover version from Hive doesn't work with struct types. https://issues.apache.org/jira/browse/SPARK-10605
          Hide
          justin.uang Justin Uang added a comment -

          Yea, my workaround has been json'ifying the struct into a string first, then doing the aggregate, then unpacking it, which is obviously very unideal. Also, using Hive makes my unit tests take 25 seconds to start up, instead of 3 seconds.

          Show
          justin.uang Justin Uang added a comment - Yea, my workaround has been json'ifying the struct into a string first, then doing the aggregate, then unpacking it, which is obviously very unideal. Also, using Hive makes my unit tests take 25 seconds to start up, instead of 3 seconds.
          Hide
          onetoinfinity@yahoo.com Cristian Opris added a comment -

          Seconded, looks like MutableAggregationBuffer is not so mutable after all, everything gets converted to catalyst types and back everytime, which makes it impossible to implement anything that collects a larger amount of data to evaluate later.

          Show
          onetoinfinity@yahoo.com Cristian Opris added a comment - Seconded, looks like MutableAggregationBuffer is not so mutable after all, everything gets converted to catalyst types and back everytime, which makes it impossible to implement anything that collects a larger amount of data to evaluate later.
          Hide
          hvanhovell Herman van Hovell added a comment -

          You could implement this as an ImperativeAggregate make sure it does not support partial aggregation (override supportsPartial) and maintain state in the class itself. Look at org.apache.spark.sql.hive.HiveUDAFFunction for an example. It won't be quick but it should work (as long as the size of the size of the collection doesn't cause OOMEs).

          Show
          hvanhovell Herman van Hovell added a comment - You could implement this as an ImperativeAggregate make sure it does not support partial aggregation (override supportsPartial ) and maintain state in the class itself. Look at org.apache.spark.sql.hive.HiveUDAFFunction for an example. It won't be quick but it should work (as long as the size of the size of the collection doesn't cause OOMEs).

            People

            • Assignee:
              nburoojy Nick Buroojy
              Reporter:
              yhuai Yin Huai
            • Votes:
              4 Vote for this issue
              Watchers:
              15 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development