Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-34806

Helper class for batch Dataset.observe()



    • New Feature
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • 3.2.0
    • 3.3.0
    • SQL


      The observe method has been added to the Dataset API in 3.0.0. It allows to collect aggregate metrics over data of a Dataset while they are being processed during an action.

      These metrics are collected in a separate thread after registering QueryExecutionListener for batch datasets and StreamingQueryListener for stream datasets, respectively. While in streaming context it makes perfectly sense to process incremental metrics in an event-based fashion, for simple batch datatset processing, a single result should be retrievable without the need to register listeners or handling threading.

      Introducing an Observation helper class can hide that complexity for simple use-cases in batch processing.

      Similar to AccumulatorV2 provided by SparkContext (e.g. SparkContext.LongAccumulator), the SparkSession can provide a method to create a new Observation instance and register it with the session.

      Alternatively, an Observation instance could be instantiated on its own which on calling Observation.on(Dataset) registers with Dataset.sparkSession. This "registration" registers a listener with the session that retrieves the metrics.

      The Observation class provides methods to retrieve the metrics. This retrieval has to wait for the listener to be called in a separate thread. So all methods will wait for this, optionally with a timeout:

      • Observation.get waits without timeout and returns the metric.
      • Observation.option(time, unit) waits at most time, returns the metric as an Option, or None when the timeout occurs.
      • Observation.waitCompleted(time, unit) waits for the metrics and indicates timeout by returning false.

      Obviously, an action has to be called on the observed dataset before any of these methods are called, otherwise a timeout will occur.

      With Observation.reset, another action can be observed. Finally, Observation.close unregisters the listener from the session.


        Issue Links



              enricomi Enrico Minack
              enricomi Enrico Minack
              0 Vote for this issue
              3 Start watching this issue