Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-24315

Multiple streaming jobs detected error causing job failure

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Not A Bug
    • 2.3.0
    • None
    • Structured Streaming
    • None

    Description

      We are running a simple structured streaming job. It reads data from Kafka and writes it to HDFS. Unfortunately at startup, the application fails with the following error. After some restarts the application finally starts successfully.

      org.apache.spark.sql.streaming.StreamingQueryException: assertion failed: Concurrent update to the log. Multiple streaming jobs detected for 1
      === Streaming Query ===
      ....
      at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
              at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
      Caused by: java.lang.AssertionError: assertion failed: Concurrent update to the log. Multiple streaming jobs detected for 1
              at scala.Predef$.assert(Predef.scala:170)
              at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcV$sp(MicroBatchExecution.scala:339)
              at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:338)
              at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:338)
              at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
              at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
              at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:338)
              at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:128)
              at
      org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
              at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
              at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
              at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
              at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
              at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
              at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
              at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
              ... 1 more
      

      We have not set any value for `spark.streaming.concurrentJobs`. Our code looks like:

                // read from kafka
                .withWatermark("timestamp", "30 minutes")
                .groupBy(window($"timestamp", "1 hour", "30 minutes"), ...).count()
                // simple select of some fields with casts
                .coalesce(1)
                .writeStream
                .trigger(Trigger.ProcessingTime("2 minutes"))
                .option("checkpointLocation", checkpointDir)
                // write to HDFS
                .start()
                .awaitTermination()
      

      This may also be related to the presence of some data in the kafka queue to process, so the time for the first batch may be longer than usual (as it is quite common I think).

      Attachments

        Activity

          People

            Unassigned Unassigned
            mgaido Marco Gaido
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: