Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-38917

StreamingQuery.processAllAvailable() blocks forever on queries containing mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.1.2
    • None
    • Structured Streaming
    • None

    Description

      StreamingQuery.processAllAvailable() blocks forever when called on queries containing a mapGroupsWithState operation configured with GroupStateTimeout.ProcessingTimeTimeout().

       

      I think processAllAvailable() should unblock when all incoming data has been processed AND when all existing groupStates do not have a current timeout specified.

       

      Sample code to demonstrate this failure follows:

      def demoSparkProcessAllAvailableBug() : Unit = {
          val localSpark = SparkSession
            .builder()
            .master("local[*]")
            .appName("demoSparkProcessAllAvailableBug")
            .config("spark.driver.host", "localhost")
            .getOrCreate()
      
          import localSpark.implicits._
      
          val demoDataStream = MemoryStream[BugDemo.NameNumberData](1, localSpark.sqlContext)
          demoDataStream.addData(BugDemo.NameNumberData("Alice", 1))
          demoDataStream.addData(BugDemo.NameNumberData("Bob", 2))
          demoDataStream.addData(BugDemo.NameNumberData("Alice", 3))
          demoDataStream.addData(BugDemo.NameNumberData("Bob", 4))
      
          // StreamingQuery.processAllAvailable() is successful when executing against NoTimeout,
          // but blocks forever when executing against EventTimeTimeout
          val timeoutTypes = List(GroupStateTimeout.NoTimeout(), GroupStateTimeout.ProcessingTimeTimeout())
      
          for (timeoutType <- timeoutTypes) {
            val totalByName = demoDataStream.toDF()
              .as[BugDemo.NameNumberData]
              .groupByKey(_.Name)
              .mapGroupsWithState(timeoutType)(BugDemo.summateRunningTotal)
      
            val totalByNameQuery = totalByName
              .writeStream
              .format("console")
              .outputMode("update")
              .start()
      
            println(s"${timeoutType} query starting to processAllAvailable()")
            totalByNameQuery.processAllAvailable()
            println(s"${timeoutType} query completed processAllAvailable()")
      
            totalByNameQuery.stop()
          }
        }
      }
      
      object BugDemo {
        def summateRunningTotal(name: String, input: Iterator[NameNumberData], groupState: GroupState[RunningTotal]): NameNumberData = {
          var currentTotal: Int = if (groupState.exists) {
            groupState.get.Total
          } else {
            0
          }
      
          for (nameNumberData <- input) {
            currentTotal += nameNumberData.Number
          }
      
          groupState.update(RunningTotal(currentTotal))
      
          NameNumberData(name, currentTotal)
        }
      
        case class NameNumberData(
          Name: String,
          Number: Integer
        )
      
        case class RunningTotal(
          Total: Integer
        )
      } 

      Attachments

        Activity

          People

            Unassigned Unassigned
            tchristman Trevor Christman
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: