Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-25424

Window duration and slide duration with negative values should fail fast

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Not A Bug
    • 2.3.1
    • None
    • SQL
    • None

    Description

      In TimeWindow class window duration and slide duration should not be allowed to take negative values.

      Currently this behaviour enforced by catalyst. It can be enforced by constructor of TimeWindow allowing it to fail fast.

      For e.g. the code below throws following error. Note that the error is produced at the time of count() call instead of window() call.

      val df = spark.readStream
        .format("rate")
        .option("numPartitions", "2")
        .option("rowsPerSecond", "10")
        .load()
        .filter("value % 20 == 0")
        .withWatermark("timestamp", "10 seconds")
        .groupBy(window($"timestamp", "-10 seconds", "5 seconds"))
        .count()
      

      Error:

      cannot resolve 'timewindow(timestamp, -10000000, 5000000, 0)' due to data type mismatch: The window duration (-10000000) must be greater than 0.;;
      'Aggregate [timewindow(timestamp#47-T10000ms, -10000000, 5000000, 0)], [timewindow(timestamp#47-T10000ms, -10000000, 5000000, 0) AS window#53, count(1) AS count#57L]
      +- AnalysisBarrier
            +- EventTimeWatermark timestamp#47: timestamp, interval 10 seconds
               +- Filter ((value#48L % cast(20 as bigint)) = cast(0 as bigint))
                  +- StreamingRelationV2 org.apache.spark.sql.execution.streaming.RateSourceProvider@52e44f71, rate, Map(rowsPerSecond -> 10, numPartitions -> 2), [timestamp#47, value#48L], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@221961f2,rate,List(),None,List(),None,Map(rowsPerSecond -> 10, numPartitions -> 2),None), rate, [timestamp#45, value#46L]
      
      org.apache.spark.sql.AnalysisException: cannot resolve 'timewindow(timestamp, -10000000, 5000000, 0)' due to data type mismatch: The window duration (-10000000) must be greater than 0.;;
      'Aggregate [timewindow(timestamp#47-T10000ms, -10000000, 5000000, 0)], [timewindow(timestamp#47-T10000ms, -10000000, 5000000, 0) AS window#53, count(1) AS count#57L]
      +- AnalysisBarrier
            +- EventTimeWatermark timestamp#47: timestamp, interval 10 seconds
               +- Filter ((value#48L % cast(20 as bigint)) = cast(0 as bigint))
                  +- StreamingRelationV2 org.apache.spark.sql.execution.streaming.RateSourceProvider@52e44f71, rate, Map(rowsPerSecond -> 10, numPartitions -> 2), [timestamp#47, value#48L], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@221961f2,rate,List(),None,List(),None,Map(rowsPerSecond -> 10, numPartitions -> 2),None), rate, [timestamp#45, value#46L]
      
      	at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
      	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:93)
      	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:85)
      	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
      	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
      	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
      	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
      	at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
      	at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
      	at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:107)
      	at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:107)
      	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
      	at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:106)
      	at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:118)
      	at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$1.apply(QueryPlan.scala:122)
      	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
      	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
      	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
      	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
      	at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:122)
      	at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:127)
      	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
      	at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:127)
      	at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:95)
      	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:85)
      	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:80)
      	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
      	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:80)
      	at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:92)
      	at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
      	at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
      	at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
      	at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
      	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
      	at org.apache.spark.sql.RelationalGroupedDataset.toDF(RelationalGroupedDataset.scala:66)
      	at org.apache.spark.sql.RelationalGroupedDataset.count(RelationalGroupedDataset.scala:239)
      	at com.hortonworks.qe.HelloScalaTest$$anonfun$2.apply$mcV$sp(HelloScalaTest.scala:39)
      	at com.hortonworks.qe.HelloScalaTest$$anonfun$2.apply(HelloScalaTest.scala:28)
      	at com.hortonworks.qe.HelloScalaTest$$anonfun$2.apply(HelloScalaTest.scala:28)
      	at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
      	at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
      	at org.scalatest.Transformer.apply(Transformer.scala:22)
      	at org.scalatest.Transformer.apply(Transformer.scala:20)
      	at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
      	at org.scalatest.TestSuite$class.withFixture(TestSuite.scala:196)
      	at org.scalatest.FunSuite.withFixture(FunSuite.scala:1560)
      	at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:183)
      	at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
      	at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
      	at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
      	at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:196)
      	at com.hortonworks.qe.AbstractTest.org$scalatest$BeforeAndAfter$$super$runTest(AbstractTest.scala:11)
      	at org.scalatest.BeforeAndAfter$class.runTest(BeforeAndAfter.scala:203)
      	at com.hortonworks.qe.AbstractTest.runTest(AbstractTest.scala:11)
      	at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
      	at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
      	at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)
      	at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
      	at scala.collection.immutable.List.foreach(List.scala:381)
      	at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
      	at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)
      	at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
      	at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:229)
      	at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
      	at org.scalatest.Suite$class.run(Suite.scala:1147)
      	at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
      	at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
      	at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
      	at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
      	at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:233)
      	at com.hortonworks.qe.AbstractTest.org$scalatest$BeforeAndAfter$$super$run(AbstractTest.scala:11)
      	at org.scalatest.BeforeAndAfter$class.run(BeforeAndAfter.scala:258)
      	at com.hortonworks.qe.AbstractTest.run(AbstractTest.scala:11)
      	at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
      	at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1340)
      	at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1334)
      	at scala.collection.immutable.List.foreach(List.scala:381)
      	at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1334)
      	at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1011)
      	at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1010)
      	at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1500)
      	at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1010)
      	at org.scalatest.tools.Runner$.run(Runner.scala:850)
      	at org.scalatest.tools.Runner.run(Runner.scala)
      	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:131)
      	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            raghavgautam Raghav Kumar Gautam
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: