Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-36240

Graceful termination of Spark Structured Streaming queries

    XMLWordPrintableJSON

Details

    • New Feature
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.1.2
    • None
    • Structured Streaming
    • None
    •  

       

    Description

      Spark Streaming provides a way to gracefully stop the streaming application using the configuration parameter spark.streaming.stopGracefullyOnShutdown. The configuration states:

      If true, Spark shuts down the StreamingContext gracefully on JVM shutdown rather than immediately.

      This effectively stops the job generation (see JobGenerator of Spark Streaming) and lets the current Job (corresponding to a micro-batch) be finished instead of canceling the active job itself.

      Some applications may require graceful stopping so that their output would remain consistent - an output that is written out halfway poses a lot of problems for applications that would require "exactly-once" semantics.

      There is no support in Structured Streaming to gracefully stop queries/streaming applications.

      Naive solutions found on the web propose checking whether the queries are active using query.isActive or checking query state directly and then attempting to call stop() at the right time. In Structured Streaming, with the current implementation, stop() cancels the job group that may lead to inconsistent output, because it still depends on the timing of the cancellation.

      Proposed solution:

      Strictly speaking in the context of the micro-batch execution model, a StreamingQuery that we want to gracefully stop would be of implementation {{MicroBatchExecution. }}The motivation is similar to that of the Streaming Context's gracefulness: stop the "job generation" and then wait for any active job to finish, instead of canceling the jobs.

      The micro-batch scheduling is managed by a ProcessingTimeExecutor of the MicroBatchExecution class.

       

      private val triggerExecutor = trigger match {
        case t: ProcessingTimeTrigger => ProcessingTimeExecutor(t, triggerClock)
        case OneTimeTrigger => OneTimeExecutor()
        case _ => throw new IllegalStateException(s"Unknown type of trigger: $trigger")
      }
      

      The following while-true is being run be the job generation mechanism. The triggerHandler is a UDF that generates the micro-batches.

      override def execute(triggerHandler: () => Boolean): Unit = {
        while (true) {
          val triggerTimeMs = clock.getTimeMillis
          val nextTriggerTimeMs = nextBatchTime(triggerTimeMs)
          val terminated = !triggerHandler()
          if (intervalMs > 0) {
            val batchElapsedTimeMs = clock.getTimeMillis - triggerTimeMs
            if (batchElapsedTimeMs > intervalMs) {
              notifyBatchFallingBehind(batchElapsedTimeMs)
            }
            if (terminated) {
              return
            }
            clock.waitTillTime(nextTriggerTimeMs)
          } else {
            if (terminated) {
              return
            }
          }
        }
      }
      

      Here, upon a gracefulStop() signal from the queries could essentially signal ProcessingTimeExecutor to stop triggering new batches.

      Then another mechanism is required that would await until any current job is finished. Then, it would call stop() and then the SparkSession may be stopped as well.

      Attachments

        Activity

          People

            Unassigned Unassigned
            Ehnalis Zoltán Zvara
            Votes:
            1 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated: