Details

    • Type: Sub-task
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.5.0
    • Component/s: SQL
    • Labels:
      None

      Description

      Right now only Hive UDAFs are supported. It would be nice to have UDAF similar to UDF through SQLContext.registerFunction.

      1. logs.zip
        3 kB
        Milad Bourhani
      2. spark-udaf.zip
        4 kB
        Milad Bourhani
      3. spark-udaf-adapted-1.5.2.zip
        4 kB
        Milad Bourhani

        Issue Links

          Activity

          Hide
          surajshetiya cynepia added a comment -

          Can someone update on where do we stand on this issue? Also, if this would also be supported beyond SQL for dataframes.

          Show
          surajshetiya cynepia added a comment - Can someone update on where do we stand on this issue? Also, if this would also be supported beyond SQL for dataframes.
          Hide
          surajshetiya cynepia added a comment -

          Also, include GroupedData

          Show
          surajshetiya cynepia added a comment - Also, include GroupedData
          Hide
          maropu Takeshi Yamamuro added a comment -

          See SPARK-4233, we are refactoring the interfaces of Aggregate before it support UDAF.
          https://issues.apache.org/jira/browse/SPARK-4233

          Show
          maropu Takeshi Yamamuro added a comment - See SPARK-4233 , we are refactoring the interfaces of Aggregate before it support UDAF. https://issues.apache.org/jira/browse/SPARK-4233
          Hide
          surajshetiya cynepia added a comment -

          Hi Takeshi San,

          Thanks for the quick response. I would like to know, if there are any active discussions on the topic. While we refactor the interface for aggregates, We should keep in mind the DataFrame, GroupedData, and possibly also sql.dataframe.Column, which looks different from pandas.Series and isn't currently supporting any aggregations.

          We would be happy to participate and contribute, if there are any discussions on the same.

          Show
          surajshetiya cynepia added a comment - Hi Takeshi San, Thanks for the quick response. I would like to know, if there are any active discussions on the topic. While we refactor the interface for aggregates, We should keep in mind the DataFrame, GroupedData, and possibly also sql.dataframe.Column, which looks different from pandas.Series and isn't currently supporting any aggregations. We would be happy to participate and contribute, if there are any discussions on the same.
          Hide
          maropu Takeshi Yamamuro added a comment -

          Sorry, but Im not sure about the issue of that.
          SPARK-4233 just simplifies and bug-fixes the interface of Aggregate.
          If you'd like to discuss the topic, ISTM you need to make a new jira ticket about that.

          Show
          maropu Takeshi Yamamuro added a comment - Sorry, but Im not sure about the issue of that. SPARK-4233 just simplifies and bug-fixes the interface of Aggregate. If you'd like to discuss the topic, ISTM you need to make a new jira ticket about that.
          Hide
          apachespark Apache Spark added a comment -

          User 'yhuai' has created a pull request for this issue:
          https://github.com/apache/spark/pull/7458

          Show
          apachespark Apache Spark added a comment - User 'yhuai' has created a pull request for this issue: https://github.com/apache/spark/pull/7458
          Hide
          milad.bourhani@gmail.com Milad Bourhani added a comment - - edited

          Hi

          I've tried using your GeometricMean example with both a local (local[2]) mode and a remote (spark://...) and it appears to lead to different results, am I doing something wrong? I'm connecting to a Cassandra DB.

          I'm attaching a sample Maven project with a README.md explaining how to reproduce the two execution modes:
          spark-udaf.zip

          Thank you.

          Show
          milad.bourhani@gmail.com Milad Bourhani added a comment - - edited Hi I've tried using your GeometricMean example with both a local (local [2] ) mode and a remote (spark://...) and it appears to lead to different results, am I doing something wrong? I'm connecting to a Cassandra DB. I'm attaching a sample Maven project with a README.md explaining how to reproduce the two execution modes: spark-udaf.zip Thank you.
          Hide
          hvanhovell Herman van Hovell added a comment -

          Hello Milad,

          Could you be a bit more specific? What is the problem you are having? Is there a difference between local mode and cluster mode? What version of spark are you using?

          I have adapted your code:

          import java.math.BigDecimal
          
          import org.apache.spark.sql.expressions.MutableAggregationBuffer
          import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
          import org.apache.spark.sql.Row
          import org.apache.spark.sql.types.{StructType, StructField, DataType, DoubleType, LongType}
          
          class GeometricMean extends UserDefinedAggregateFunction {
            def inputSchema: StructType =
              StructType(StructField("value", DoubleType) :: Nil)
          
            def bufferSchema: StructType = StructType(
              StructField("count", LongType) ::
                StructField("product", DoubleType) :: Nil
            )
          
            def dataType: DataType = DoubleType
          
            def deterministic: Boolean = true
          
            def initialize(buffer: MutableAggregationBuffer): Unit = {
              buffer(0) = 0L
              buffer(1) = 1.0
            }
          
            def update(buffer: MutableAggregationBuffer,input: Row): Unit = {
              buffer(0) = buffer.getAs[Long](0) + 1
              buffer(1) = buffer.getAs[Double](1) * input.getAs[Double](0)
            }
          
            def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
              buffer1(0) = buffer1.getAs[Long](0) + buffer2.getAs[Long](0)
              buffer1(1) = buffer1.getAs[Double](1) * buffer2.getAs[Double](1)
            }
          
            def evaluate(buffer: Row): Any = {
              math.pow(buffer.getDouble(1), 1.0d / buffer.getLong(0))
            }
          }
          
          sqlContext.udf.register("gm", new GeometricMean)
          
          val df = Seq(
            (1, "italy", "emilia", 42, BigDecimal.valueOf(100, 0), "john"),
            (2, "italy", "toscana", 42, BigDecimal.valueOf(505, 1), "jim"),
            (3, "italy", "puglia", 42, BigDecimal.valueOf(70, 0), "jenn"),
            (4, "italy", "emilia", 42, BigDecimal.valueOf(75 ,0), "jack"),
            (5, "uk", "london", 42, BigDecimal.valueOf(200 ,0), "carl"),
            (6, "italy", "emilia", 42, BigDecimal.valueOf(42, 0), "john")).
            toDF("receipt_id", "store_country", "store_region", "store_id", "amount", "seller_name")
          df.registerTempTable("receipts")
            
          val q = sql("""
          select   store_country,
                   store_region,
                   avg(amount),
                   sum(amount),
                   gm(amount)
          from     receipts
          where    amount > 50
                   and store_country = 'italy'
          group by store_country, store_region
          """)
          
          q.show
          
          // Result (SPARK 1.5.2):
          +-------------+------------+----+--------------------+-----------------+
          |store_country|store_region| _c2|                 _c3|              _c4|
          +-------------+------------+----+--------------------+-----------------+
          |        italy|      emilia|87.5|175.0000000000000...|86.60254037844386|
          |        italy|     toscana|50.5|50.50000000000000...|             50.5|
          |        italy|      puglia|  70|70.00000000000000...|             70.0|
          +-------------+------------+----+--------------------+-----------------+
          

          And I really cannot find a problem.

          Show
          hvanhovell Herman van Hovell added a comment - Hello Milad, Could you be a bit more specific? What is the problem you are having? Is there a difference between local mode and cluster mode? What version of spark are you using? I have adapted your code: import java.math.BigDecimal import org.apache.spark.sql.expressions.MutableAggregationBuffer import org.apache.spark.sql.expressions.UserDefinedAggregateFunction import org.apache.spark.sql.Row import org.apache.spark.sql.types.{StructType, StructField, DataType, DoubleType, LongType} class GeometricMean extends UserDefinedAggregateFunction { def inputSchema: StructType = StructType(StructField("value", DoubleType) :: Nil) def bufferSchema: StructType = StructType( StructField("count", LongType) :: StructField("product", DoubleType) :: Nil ) def dataType: DataType = DoubleType def deterministic: Boolean = true def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = 0L buffer(1) = 1.0 } def update(buffer: MutableAggregationBuffer,input: Row): Unit = { buffer(0) = buffer.getAs[Long](0) + 1 buffer(1) = buffer.getAs[Double](1) * input.getAs[Double](0) } def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { buffer1(0) = buffer1.getAs[Long](0) + buffer2.getAs[Long](0) buffer1(1) = buffer1.getAs[Double](1) * buffer2.getAs[Double](1) } def evaluate(buffer: Row): Any = { math.pow(buffer.getDouble(1), 1.0d / buffer.getLong(0)) } } sqlContext.udf.register("gm", new GeometricMean) val df = Seq( (1, "italy", "emilia", 42, BigDecimal.valueOf(100, 0), "john"), (2, "italy", "toscana", 42, BigDecimal.valueOf(505, 1), "jim"), (3, "italy", "puglia", 42, BigDecimal.valueOf(70, 0), "jenn"), (4, "italy", "emilia", 42, BigDecimal.valueOf(75 ,0), "jack"), (5, "uk", "london", 42, BigDecimal.valueOf(200 ,0), "carl"), (6, "italy", "emilia", 42, BigDecimal.valueOf(42, 0), "john")). toDF("receipt_id", "store_country", "store_region", "store_id", "amount", "seller_name") df.registerTempTable("receipts") val q = sql(""" select store_country, store_region, avg(amount), sum(amount), gm(amount) from receipts where amount > 50 and store_country = 'italy' group by store_country, store_region """) q.show // Result (SPARK 1.5.2): +-------------+------------+----+--------------------+-----------------+ |store_country|store_region| _c2| _c3| _c4| +-------------+------------+----+--------------------+-----------------+ | italy| emilia|87.5|175.0000000000000...|86.60254037844386| | italy| toscana|50.5|50.50000000000000...| 50.5| | italy| puglia| 70|70.00000000000000...| 70.0| +-------------+------------+----+--------------------+-----------------+ And I really cannot find a problem.
          Hide
          milad.bourhani@gmail.com Milad Bourhani added a comment - - edited

          I'm using vesion 1.5.0 because with 1.5.2 I have this problem:

          java.lang.NoSuchMethodError: org.apache.spark.sql.execution.datasources.LogicalRelation.<init>(Lorg/apache/spark/sql/sources/BaseRelation;)V

          The problem I have is that the results are different depending on the Master mode.

          local[2] (correct):

          [[italy,emilia,87.5,175.000000000000000000,86.60254037844386],
          [italy,toscana,50.5,50.500000000000000000,50.5],
          [italy,puglia,70,70.000000000000000000,70.0]]
          

          spark://ubuntu:7077 (wrong):

          [[italy,emilia,87.5,175.000000000000000000,7.136378562382804E-157],
          [italy,toscana,50.5,50.500000000000000000,5.09278989856E-313],
          [italy,puglia,70,70.000000000000000000,5.09278989856E-313]]
          
          Show
          milad.bourhani@gmail.com Milad Bourhani added a comment - - edited I'm using vesion 1.5.0 because with 1.5.2 I have this problem: java.lang.NoSuchMethodError: org.apache.spark.sql.execution.datasources.LogicalRelation.<init>(Lorg/apache/spark/sql/sources/BaseRelation;)V The problem I have is that the results are different depending on the Master mode. local [2] (correct): [[italy,emilia,87.5,175.000000000000000000,86.60254037844386], [italy,toscana,50.5,50.500000000000000000,50.5], [italy,puglia,70,70.000000000000000000,70.0]] spark://ubuntu:7077 (wrong): [[italy,emilia,87.5,175.000000000000000000,7.136378562382804E-157], [italy,toscana,50.5,50.500000000000000000,5.09278989856E-313], [italy,puglia,70,70.000000000000000000,5.09278989856E-313]]
          Hide
          hvanhovell Herman van Hovell added a comment -

          The problem with 1.5.2 you are having, is probably caused by conflicting versions of spark on your classpath or a lib referencing a different version of Spark.

          I cannot reproduce your problem on 1.5.2 (local cluster) and the master (local and local cluster).

          Show
          hvanhovell Herman van Hovell added a comment - The problem with 1.5.2 you are having, is probably caused by conflicting versions of spark on your classpath or a lib referencing a different version of Spark. I cannot reproduce your problem on 1.5.2 (local cluster) and the master (local and local cluster).
          Hide
          milad.bourhani@gmail.com Milad Bourhani added a comment - - edited

          Well then I guess it's a problem with either Spark 1.5.0 or (maybe more likely) spark-cassandra-connector_2.10 version 1.5.0-M2. Just for the record, this is the latest release available on the Maven repo, and this version depends on Spark 1.5.0 – this is the reason why I'm using that version; also, as you also pointed out, I guess that using Spark 1.5.2 causes conflicts with version 1.5.0 (coming from the dependency).

          Anyway thanks, if I can try this test case against 1.5.2 I'll report the result here (or in case I should use another means of communication let me know). Thank you for your time.

          Show
          milad.bourhani@gmail.com Milad Bourhani added a comment - - edited Well then I guess it's a problem with either Spark 1.5.0 or (maybe more likely) spark-cassandra-connector_2.10 version 1.5.0-M2. Just for the record, this is the latest release available on the Maven repo, and this version depends on Spark 1.5.0 – this is the reason why I'm using that version; also, as you also pointed out, I guess that using Spark 1.5.2 causes conflicts with version 1.5.0 (coming from the dependency). Anyway thanks, if I can try this test case against 1.5.2 I'll report the result here (or in case I should use another means of communication let me know). Thank you for your time.
          Hide
          milad.bourhani@gmail.com Milad Bourhani added a comment - - edited

          I managed to run your code against both 1.5.2 and 1.5.0:

          • works fine on 1.5.2
          • differs on 1.5.0, where using a cluster the following result is computed:
            +-------------+------------+----+--------------------+--------------------+
            |store_country|store_region| _c2|                 _c3|                 _c4|
            +-------------+------------+----+--------------------+--------------------+
            |        italy|      emilia|87.5|175.0000000000000...|6.18028512604618E...|
            |        italy|     toscana|50.5|50.50000000000000...|                50.5|
            |        italy|      puglia|  70|70.00000000000000...|                70.0|
            +-------------+------------+----+--------------------+--------------------+
            

          EDIT – Running the example on 1.5.2 multiple times the error sometimes showed up there too, and every time this happened, there was an error on the Worker's log, identical to SPARK-9844, so it looks like that log error makes the cluster computation fail (and the result changes, too):

          +-------------+------------+----+--------------------+--------------------+
          |store_country|store_region| _c2|                 _c3|                 _c4|
          +-------------+------------+----+--------------------+--------------------+
          |        italy|      emilia|87.5|175.0000000000000...|7.136378562382805...|
          |        italy|     toscana|50.5|50.50000000000000...|                50.5|
          |        italy|      puglia|  70|70.00000000000000...|                70.0|
          +-------------+------------+----+--------------------+--------------------
          

          Note also that my conf/spark-env.sh contains this single line:

          export SPARK_WORKER_INSTANCES=1
          

          so I find it strange that a race condition occurs (SPARK-9844 talks about race conditions).

          Show
          milad.bourhani@gmail.com Milad Bourhani added a comment - - edited I managed to run your code against both 1.5.2 and 1.5.0: works fine on 1.5.2 differs on 1.5.0, where using a cluster the following result is computed: +-------------+------------+----+--------------------+--------------------+ |store_country|store_region| _c2| _c3| _c4| +-------------+------------+----+--------------------+--------------------+ | italy| emilia|87.5|175.0000000000000...|6.18028512604618E...| | italy| toscana|50.5|50.50000000000000...| 50.5| | italy| puglia| 70|70.00000000000000...| 70.0| +-------------+------------+----+--------------------+--------------------+ EDIT – Running the example on 1.5.2 multiple times the error sometimes showed up there too, and every time this happened, there was an error on the Worker's log, identical to SPARK-9844 , so it looks like that log error makes the cluster computation fail (and the result changes, too): +-------------+------------+----+--------------------+--------------------+ |store_country|store_region| _c2| _c3| _c4| +-------------+------------+----+--------------------+--------------------+ | italy| emilia|87.5|175.0000000000000...|7.136378562382805...| | italy| toscana|50.5|50.50000000000000...| 50.5| | italy| puglia| 70|70.00000000000000...| 70.0| +-------------+------------+----+--------------------+-------------------- Note also that my conf/spark-env.sh contains this single line: export SPARK_WORKER_INSTANCES=1 so I find it strange that a race condition occurs ( SPARK-9844 talks about race conditions).
          Hide
          yhuai Yin Huai added a comment - - edited

          Milad Bourhani Is it possible that you can attach the worker logs (or a part of it)? Also, seems SPARK-9844 will be triggered when we shutdown the worker. For you case, did you observe that work got shutdown? How about we create a new jira to investigate this problem?

          Update: Seems SparkWorker shutdown means executor shutdown. For your case, did you see executor got killed and new executor got launched?

          Show
          yhuai Yin Huai added a comment - - edited Milad Bourhani Is it possible that you can attach the worker logs (or a part of it)? Also, seems SPARK-9844 will be triggered when we shutdown the worker. For you case, did you observe that work got shutdown? How about we create a new jira to investigate this problem? Update : Seems SparkWorker shutdown means executor shutdown. For your case, did you see executor got killed and new executor got launched?
          Hide
          yhuai Yin Huai added a comment -

          Also, for your environment, are you running standalone mode, yarn, or mesos?

          Show
          yhuai Yin Huai added a comment - Also, for your environment, are you running standalone mode, yarn, or mesos?
          Hide
          yhuai Yin Huai added a comment -

          OK . I can reproduce the issue in 1.5 branch. I have created https://issues.apache.org/jira/browse/SPARK-11885. I could not reproduce it in branch 1.6 (it is quite easy to reproduce in 1.5). So, I think it is a 1.5 issue.

          Show
          yhuai Yin Huai added a comment - OK . I can reproduce the issue in 1.5 branch. I have created https://issues.apache.org/jira/browse/SPARK-11885 . I could not reproduce it in branch 1.6 (it is quite easy to reproduce in 1.5). So, I think it is a 1.5 issue.
          Hide
          milad.bourhani@gmail.com Milad Bourhani added a comment -

          I was running standalone mode and the log is identical to SPARK-9844, that's why I didn't attach it, sorry.

          Anyway, basically I just unzipped the Spark 1.5.2 pre-built distribution for Hadoop 2.6 (though I don't have Hadoop on my machine), set the number of workers to 1 in spart-env.sh and launched the adapted code against it.

          I'll attach the logs first thing on Monday (sorry I don't have them at the moment). I'm now attaching the project with the adapted code – spark-udaf-adapted-1.5.2.zip.

          Show
          milad.bourhani@gmail.com Milad Bourhani added a comment - I was running standalone mode and the log is identical to SPARK-9844 , that's why I didn't attach it, sorry. Anyway, basically I just unzipped the Spark 1.5.2 pre-built distribution for Hadoop 2.6 (though I don't have Hadoop on my machine), set the number of workers to 1 in spart-env.sh and launched the adapted code against it. I'll attach the logs first thing on Monday (sorry I don't have them at the moment). I'm now attaching the project with the adapted code – spark-udaf-adapted-1.5.2.zip .
          Hide
          milad.bourhani@gmail.com Milad Bourhani added a comment -

          Just for completeness, I'm attaching the logs.zip. For the record, it looks as if the first time you run the clustered computation (right after you started ./sbin/start-all.sh), the computation is OK, even though the race condition error shows up in the log. After that, it fails. So the attached logs contain exactly two executions: the first gives a correct answer, the second doesn't.
          To reproduce, run these commands on the unzipped project spark-udaf-adapted-1.5.2.zip:

          mvn clean install
          java -jar `ls target/uber*.jar` `ls target/uber*.jar` spark://master_host:7077
          java -jar `ls target/uber*.jar` `ls target/uber*.jar` spark://master_host:7077
          

          where spark://master_host:7077 is your master URL.

          Show
          milad.bourhani@gmail.com Milad Bourhani added a comment - Just for completeness, I'm attaching the logs.zip . For the record, it looks as if the first time you run the clustered computation (right after you started ./sbin/start-all.sh ), the computation is OK, even though the race condition error shows up in the log. After that, it fails. So the attached logs contain exactly two executions: the first gives a correct answer, the second doesn't. To reproduce, run these commands on the unzipped project spark-udaf-adapted-1.5.2.zip : mvn clean install java -jar `ls target/uber*.jar` `ls target/uber*.jar` spark://master_host:7077 java -jar `ls target/uber*.jar` `ls target/uber*.jar` spark://master_host:7077 where spark://master_host:7077 is your master URL.

            People

            • Assignee:
              yhuai Yin Huai
              Reporter:
              pllee Pei-Lun Lee
            • Votes:
              20 Vote for this issue
              Watchers:
              33 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development