Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-24019

AnalysisException for Window function expression to compute derivative

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Incomplete
    • 2.1.1
    • None
    • SQL
    • Ubuntu, spark 2.1.1, standalone.

    Description

      I am using spark 2.1.1 currently.

      I created an expression to compute the derivative of some series data using a window function.

      I have a simple reproducible case of the error.

      I'm only filing this bug because the error message says "Please file a bug report with this error message, stack trace, and the query."

      Here they are:

      ((coalesce(lead(value#9, 1, null) windowspecdefinition(category#8, sequence_num#7 ASC NULLS FIRST, ROWS BETWEEN 1 FOLLOWING AND 1 FOLLOWING), value#9) - coalesce(lag(value#9, 1, null) windowspecdefinition(category#8, sequence_num#7 ASC NULLS FIRST, ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING), value#9)) / cast((coalesce(lead(sequence_num#7, 1, null) windowspecdefinition(category#8, sequence_num#7 ASC NULLS FIRST, ROWS BETWEEN 1 FOLLOWING AND 1 FOLLOWING), sequence_num#7) - coalesce(lag(sequence_num#7, 1, null) windowspecdefinition(category#8, sequence_num#7 ASC NULLS FIRST, ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING), sequence_num#7)) as double)) windowspecdefinition(category#8, sequence_num#7 ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS derivative#14 has multiple Window Specifications (ArrayBuffer(windowspecdefinition(category#8, sequence_num#7 ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), windowspecdefinition(category#8, sequence_num#7 ASC NULLS FIRST, ROWS BETWEEN 1 FOLLOWING AND 1 FOLLOWING), windowspecdefinition(category#8, sequence_num#7 ASC NULLS FIRST, ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING))).
      
      Please file a bug report with this error message, stack trace, and the query.;
      org.apache.spark.sql.AnalysisException: ((coalesce(lead(value#9, 1, null) windowspecdefinition(category#8, sequence_num#7 ASC NULLS FIRST, ROWS BETWEEN 1 FOLLOWING AND 1 FOLLOWING), value#9) - coalesce(lag(value#9, 1, null) windowspecdefinition(category#8, sequence_num#7 ASC NULLS FIRST, ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING), value#9)) / cast((coalesce(lead(sequence_num#7, 1, null) windowspecdefinition(category#8, sequence_num#7 ASC NULLS FIRST, ROWS BETWEEN 1 FOLLOWING AND 1 FOLLOWING), sequence_num#7) - coalesce(lag(sequence_num#7, 1, null) windowspecdefinition(category#8, sequence_num#7 ASC NULLS FIRST, ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING), sequence_num#7)) as double)) windowspecdefinition(category#8, sequence_num#7 ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS derivative#14 has multiple Window Specifications (ArrayBuffer(windowspecdefinition(category#8, sequence_num#7 ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), windowspecdefinition(category#8, sequence_num#7 ASC NULLS FIRST, ROWS BETWEEN 1 FOLLOWING AND 1 FOLLOWING), windowspecdefinition(category#8, sequence_num#7 ASC NULLS FIRST, ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING))).
      Please file a bug report with this error message, stack trace, and the query.;
      at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:39)
      at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:57)
      at org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions$$anonfun$78.apply(Analyzer.scala:1772)

      And here is a simple unit test that can be used to reproduce the problem:

      import com.mineset.spark.testsupport.SparkTestCase.SPARK_SESSION
      import org.apache.spark.sql.Column
      import org.apache.spark.sql.expressions.Window
      import org.apache.spark.sql.functions._
      import org.scalatest.FunSuite
      import com.mineset.spark.testsupport.SparkTestCase._
      
      
      /**
      * Test to see that window functions work as expected on spark.
      * @author Barry Becker
      */
      class WindowFunctionSuite extends FunSuite {
      
      val simpleDf = createSimpleData()
      
      
      test("Window function for finding derivatives for 2 series") {
      
      val window =    Window.partitionBy("category").orderBy("sequence_num")//.rangeBetween(-1, 1)
      // Consider the three sequential series points (Xlag, Ylag), (X, Y), (Xlead, Ylead).
      // This defines the derivative as (Ylead - Ylag) / (Xlead - Xlag)
      // If the lead or lag points are null, then we fall back on using the middle point.
      val yLead = coalesce(lead("value", 1).over(window), col("value"))
      val yLag = coalesce(lag("value", 1).over(window), col("value"))
      val xLead = coalesce(lead("sequence_num", 1).over(window), col("sequence_num"))
      val xLag = coalesce(lag("sequence_num", 1).over(window), col("sequence_num"))
      val derivative: Column = (yLead - yLag) / (xLead - xLag)
      
      val resultDf = simpleDf.withColumn("derivative", derivative.over(window))
      
      resultDf.show()
      assertResult("???") {
        resultDf.collect().map(row => row.mkString(", ")).mkString("\n")
      }
      }
      
      def createSimpleData() = {
      val data = Seq(
      (1, "a", 2.1),
      (2, "a", 2.4),
      (1, "b", 100.0),
      (3, "a", 3.7),
      (2, "b", 70.0),
      (4, "a", 3.6),
      (3, "b", 60.0))
      SPARK_SESSION.sqlContext.createDataFrame(data).toDF("sequence_num", "category", "value")
      }
      }

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            barrybecker4 Barry Becker
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: