Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-21546

dropDuplicates with watermark yields RuntimeException due to binding failure

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.3.0
    • 2.2.1, 2.3.0
    • Structured Streaming
    • None

    Description

      With today's master...

      The following streaming query with watermark and dropDuplicates yields RuntimeException due to failure in binding.

      val topic1 = spark.
        readStream.
        format("kafka").
        option("subscribe", "topic1").
        option("kafka.bootstrap.servers", "localhost:9092").
        option("startingoffsets", "earliest").
        load
      
      val records = topic1.
        withColumn("eventtime", 'timestamp).  // <-- just to put the right name given the purpose
        withWatermark(eventTime = "eventtime", delayThreshold = "30 seconds"). // <-- use the renamed eventtime column
        dropDuplicates("value").  // dropDuplicates will use watermark
                                  // only when eventTime column exists
        // include the watermark column => internal design leak?
        select('key cast "string", 'value cast "string", 'eventtime).
        as[(String, String, java.sql.Timestamp)]
      
      scala> records.explain
      == Physical Plan ==
      *Project [cast(key#0 as string) AS key#169, cast(value#1 as string) AS value#170, eventtime#157-T30000ms]
      +- StreamingDeduplicate [value#1], StatefulOperatorStateInfo(<unknown>,93c3de98-3f85-41a4-8aef-d09caf8ea693,0,0), 0
         +- Exchange hashpartitioning(value#1, 200)
            +- EventTimeWatermark eventtime#157: timestamp, interval 30 seconds
               +- *Project [key#0, value#1, timestamp#5 AS eventtime#157]
                  +- StreamingRelation kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
      
      import org.apache.spark.sql.streaming.{OutputMode, Trigger}
      val sq = records.
        writeStream.
        format("console").
        option("truncate", false).
        trigger(Trigger.ProcessingTime("10 seconds")).
        queryName("from-kafka-topic1-to-console").
        outputMode(OutputMode.Update).
        start
      
      -------------------------------------------
      Batch: 0
      -------------------------------------------
      17/07/27 10:28:58 ERROR Executor: Exception in task 3.0 in stage 13.0 (TID 438)
      org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: eventtime#157-T30000ms
      	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
      	at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:88)
      	at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:87)
      	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
      	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
      	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
      	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
      	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
      	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
      	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
      	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
      	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
      	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
      	at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
      	at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:87)
      	at org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$.bind(GeneratePredicate.scala:45)
      	at org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$.bind(GeneratePredicate.scala:40)
      	at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:977)
      	at org.apache.spark.sql.execution.SparkPlan.newPredicate(SparkPlan.scala:370)
      	at org.apache.spark.sql.execution.streaming.StreamingDeduplicateExec.org$apache$spark$sql$execution$streaming$WatermarkSupport$$super$newPredicate(statefulOperators.scala:350)
      	at org.apache.spark.sql.execution.streaming.WatermarkSupport$$anonfun$watermarkPredicateForKeys$1.apply(statefulOperators.scala:160)
      	at org.apache.spark.sql.execution.streaming.WatermarkSupport$$anonfun$watermarkPredicateForKeys$1.apply(statefulOperators.scala:160)
      	at scala.Option.map(Option.scala:146)
      	at org.apache.spark.sql.execution.streaming.WatermarkSupport$class.watermarkPredicateForKeys(statefulOperators.scala:160)
      	at org.apache.spark.sql.execution.streaming.StreamingDeduplicateExec.watermarkPredicateForKeys$lzycompute(statefulOperators.scala:350)
      	at org.apache.spark.sql.execution.streaming.StreamingDeduplicateExec.watermarkPredicateForKeys(statefulOperators.scala:350)
      	at org.apache.spark.sql.execution.streaming.WatermarkSupport$class.removeKeysOlderThanWatermark(statefulOperators.scala:167)
      	at org.apache.spark.sql.execution.streaming.StreamingDeduplicateExec.removeKeysOlderThanWatermark(statefulOperators.scala:350)
      	at org.apache.spark.sql.execution.streaming.StreamingDeduplicateExec$$anonfun$doExecute$4$$anonfun$apply$4$$anonfun$apply$mcV$sp$1.apply$mcV$sp(statefulOperators.scala:403)
      	at org.apache.spark.sql.execution.streaming.StateStoreWriter$class.timeTakenMs(statefulOperators.scala:96)
      	at org.apache.spark.sql.execution.streaming.StreamingDeduplicateExec.timeTakenMs(statefulOperators.scala:350)
      	at org.apache.spark.sql.execution.streaming.StreamingDeduplicateExec$$anonfun$doExecute$4$$anonfun$apply$4.apply$mcV$sp(statefulOperators.scala:403)
      	at org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:46)
      	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:35)
      	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
      	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
      	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
      	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
      	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
      	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
      	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
      	at org.apache.spark.scheduler.Task.run(Task.scala:108)
      	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:344)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      	at java.lang.Thread.run(Thread.java:748)
      Caused by: java.lang.RuntimeException: Couldn't find eventtime#157-T30000ms in [value#185]
      	at scala.sys.package$.error(package.scala:27)
      	at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:94)
      	at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:88)
      	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
      	... 49 more
      

      I'm somehow convinced that watermark support leaks from StreamingDeduplicate and forces a Spark developer to include extra fields for watermark. I think filter pushdown (for the select) should not be executed for this case or should include the extra eventTime column (regardless of whether a developer uses it or not).

      Attachments

        Activity

          People

            zsxwing Shixiong Zhu
            jlaskowski Jacek Laskowski
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: