Details
-
Bug
-
Status: Resolved
-
Minor
-
Resolution: Incomplete
-
2.1.1
-
None
-
Ubuntu, spark 2.1.1, standalone.
Description
I am using spark 2.1.1 currently.
I created an expression to compute the derivative of some series data using a window function.
I have a simple reproducible case of the error.
I'm only filing this bug because the error message says "Please file a bug report with this error message, stack trace, and the query."
Here they are:
((coalesce(lead(value#9, 1, null) windowspecdefinition(category#8, sequence_num#7 ASC NULLS FIRST, ROWS BETWEEN 1 FOLLOWING AND 1 FOLLOWING), value#9) - coalesce(lag(value#9, 1, null) windowspecdefinition(category#8, sequence_num#7 ASC NULLS FIRST, ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING), value#9)) / cast((coalesce(lead(sequence_num#7, 1, null) windowspecdefinition(category#8, sequence_num#7 ASC NULLS FIRST, ROWS BETWEEN 1 FOLLOWING AND 1 FOLLOWING), sequence_num#7) - coalesce(lag(sequence_num#7, 1, null) windowspecdefinition(category#8, sequence_num#7 ASC NULLS FIRST, ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING), sequence_num#7)) as double)) windowspecdefinition(category#8, sequence_num#7 ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS derivative#14 has multiple Window Specifications (ArrayBuffer(windowspecdefinition(category#8, sequence_num#7 ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), windowspecdefinition(category#8, sequence_num#7 ASC NULLS FIRST, ROWS BETWEEN 1 FOLLOWING AND 1 FOLLOWING), windowspecdefinition(category#8, sequence_num#7 ASC NULLS FIRST, ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING))). Please file a bug report with this error message, stack trace, and the query.; org.apache.spark.sql.AnalysisException: ((coalesce(lead(value#9, 1, null) windowspecdefinition(category#8, sequence_num#7 ASC NULLS FIRST, ROWS BETWEEN 1 FOLLOWING AND 1 FOLLOWING), value#9) - coalesce(lag(value#9, 1, null) windowspecdefinition(category#8, sequence_num#7 ASC NULLS FIRST, ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING), value#9)) / cast((coalesce(lead(sequence_num#7, 1, null) windowspecdefinition(category#8, sequence_num#7 ASC NULLS FIRST, ROWS BETWEEN 1 FOLLOWING AND 1 FOLLOWING), sequence_num#7) - coalesce(lag(sequence_num#7, 1, null) windowspecdefinition(category#8, sequence_num#7 ASC NULLS FIRST, ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING), sequence_num#7)) as double)) windowspecdefinition(category#8, sequence_num#7 ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS derivative#14 has multiple Window Specifications (ArrayBuffer(windowspecdefinition(category#8, sequence_num#7 ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), windowspecdefinition(category#8, sequence_num#7 ASC NULLS FIRST, ROWS BETWEEN 1 FOLLOWING AND 1 FOLLOWING), windowspecdefinition(category#8, sequence_num#7 ASC NULLS FIRST, ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING))). Please file a bug report with this error message, stack trace, and the query.; at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:39) at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:57) at org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions$$anonfun$78.apply(Analyzer.scala:1772)
And here is a simple unit test that can be used to reproduce the problem:
import com.mineset.spark.testsupport.SparkTestCase.SPARK_SESSION import org.apache.spark.sql.Column import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions._ import org.scalatest.FunSuite import com.mineset.spark.testsupport.SparkTestCase._ /** * Test to see that window functions work as expected on spark. * @author Barry Becker */ class WindowFunctionSuite extends FunSuite { val simpleDf = createSimpleData() test("Window function for finding derivatives for 2 series") { val window = Window.partitionBy("category").orderBy("sequence_num")//.rangeBetween(-1, 1) // Consider the three sequential series points (Xlag, Ylag), (X, Y), (Xlead, Ylead). // This defines the derivative as (Ylead - Ylag) / (Xlead - Xlag) // If the lead or lag points are null, then we fall back on using the middle point. val yLead = coalesce(lead("value", 1).over(window), col("value")) val yLag = coalesce(lag("value", 1).over(window), col("value")) val xLead = coalesce(lead("sequence_num", 1).over(window), col("sequence_num")) val xLag = coalesce(lag("sequence_num", 1).over(window), col("sequence_num")) val derivative: Column = (yLead - yLag) / (xLead - xLag) val resultDf = simpleDf.withColumn("derivative", derivative.over(window)) resultDf.show() assertResult("???") { resultDf.collect().map(row => row.mkString(", ")).mkString("\n") } } def createSimpleData() = { val data = Seq( (1, "a", 2.1), (2, "a", 2.4), (1, "b", 100.0), (3, "a", 3.7), (2, "b", 70.0), (4, "a", 3.6), (3, "b", 60.0)) SPARK_SESSION.sqlContext.createDataFrame(data).toDF("sequence_num", "category", "value") } }