Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-35896

[SS] Include more granular metrics for stateful operators in StreamingQueryProgress

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments


    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.1.2
    • 3.2.0
    • Structured Streaming
    • None


      Currently the streaming progress is missing a few important stateful operator metrics in StateOperatorProgress. Each stateful operator consists of multiple steps. Ex: flatMapGroupsWithState has two major steps: 1) processing the input and 2) timeout processing to remove entries from the state which have expired. The main motivation is to track down the time it took for each individual step (such as timeout processing, watermark processing etc) and how much data is processed to pinpoint the bottlenecks and compare for reasoning why some microbatches are slow compared to others in the same job.

      Below are the final metrics common to all stateful operators (the one in bold-italic are proposed new). These metrics are in StateOperatorProgress which is part of StreamingQueryProgress.

      • operatorName - State operator name. Can help us identify any operator specific slowness and state store usage patterns. Ex. "dedupe" (derived using StateStoreWriter.shortName)
      • numRowsTotal - number of rows in the state store across all tasks in a stage where the operator has executed.
      • numRowsUpdated - number of rows added to or update in the store
      • allUpdatesTimeMs - time taken to add new rows or update existing state store rows across all tasks in a stage where the operator has executed.
      • numRowsRemoved - number of rows deleted from state store as part of the state cleanup mechanism across all tasks in a stage where the operator has executed. This number helps measure the state store deletions and impact on checkpoint commit and other latencies.
      • allRemovalsTimeMs - time taken to remove the rows from the state store as part of state (also includes the iterating through the entire state store to find which rows to delete) across all tasks in a stage where the operator has executed. If we see jobs spending significant time here, it may justify a better layout in the state store to read only the required rows than the entire state store that is read currently.
      • commitTimeMs - time taken to commit the state store changes to external storage for checkpointing. This is cumulative across all tasks in a stage where this operator has executed.
      • numShufflePartitions - number of shuffle partitions this state operator is part of. Currently the metrics like times are aggregated across all tasks in a stage where the operator has executed. Having the number shuffle partitions (corresponds to number of tasks) helps us find the average task contribution to the metric.
      • numStateStores - number of state stores in the operator across all tasks in the stage. Some stateful operators have more than one state store (eg. stream-stream join). Tracking this number helps us find correlations between state stores instances and microbatch latency.



          This comment will be Viewable by All Users Viewable by All Users


            vkorukanti Venki Korukanti
            vkorukanti Venki Korukanti
            0 Vote for this issue
            3 Start watching this issue



              Issue deployment