Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-42102

Using checkpoints in Spark Structured Streaming with the foreachBatch sink

    XMLWordPrintableJSON

Details

    • Question
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.3.1
    • None
    • None

    Description

      I want to build a fault-tolerant, recoverable Spark job (using Structured Streaming in PySpark) that reads a data stream from Kafka and uses the foreachBatch sink to implement a stateful transformation before writing the resulting data to the actual sink.

      The basic structure of my Spark job is like this:

      counter = 0
      
      def batch_handler(df, batch_id):
        global counter
        counter += 1
        df.withColumn('counter', lit(counter)).show(truncate=30)
      
      spark = (SparkSession.builder
        .appName('test.stateful.checkpoint')
        .config('spark.jars.packages', f'{KAFKA_SQL},{KAFKA_CLNT}')
        .getOrCreate())
      
      source = (spark.readStream
        .format('kafka')
        .options(**KAFKA_OPTIONS)
        .option('subscribe', 'topic-spark-stateful')
        .option('startingOffsets', 'earliest')
        .option('includeHeaders', 'true')
        .load())
      
      (source
        .selectExpr('CAST(value AS STRING) AS data', 'CAST(timestamp AS STRING) AS time')
        .writeStream
        .option('checkpointLocation', './checkpoints/stateful')
        .foreachBatch(batch_handler)
        .start()
        .awaitTermination())
      

      where the simplified batch_handler function is a stand-in for the stateful transformation + writer to the actual data sink. Also for simplicity I am using a local folder as checkpoint location.

      This works fine as far as checkpointing of Kafka offsets is concerned. But how can I include the state of my custom batch handler (counter in my simplified example) in the checkpoints such that the job can pick up where it left after a crash?

      The Spark Structured Streaming Guide doesn't say anything on the topic. With the foreach sink I can pass a custom row handler object but this seems to support only open, process, and close methods.

      Would it make sense to create a "Request" or even "Feature" ticket to enhance this with methods for restoring state from a checkpoint and exporting state to support checkpointing?

      PS: I have posted this on SOF, too. If anyone cares to answer or comment I'd be happy to upvote their post.

      Attachments

        Activity

          People

            Unassigned Unassigned
            KaiRoesner Kai-Michael Roesner
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: