Details
-
Question
-
Status: Open
-
Major
-
Resolution: Unresolved
-
3.3.1
-
None
-
None
Description
I want to build a fault-tolerant, recoverable Spark job (using Structured Streaming in PySpark) that reads a data stream from Kafka and uses the foreachBatch sink to implement a stateful transformation before writing the resulting data to the actual sink.
The basic structure of my Spark job is like this:
counter = 0 def batch_handler(df, batch_id): global counter counter += 1 df.withColumn('counter', lit(counter)).show(truncate=30) spark = (SparkSession.builder .appName('test.stateful.checkpoint') .config('spark.jars.packages', f'{KAFKA_SQL},{KAFKA_CLNT}') .getOrCreate()) source = (spark.readStream .format('kafka') .options(**KAFKA_OPTIONS) .option('subscribe', 'topic-spark-stateful') .option('startingOffsets', 'earliest') .option('includeHeaders', 'true') .load()) (source .selectExpr('CAST(value AS STRING) AS data', 'CAST(timestamp AS STRING) AS time') .writeStream .option('checkpointLocation', './checkpoints/stateful') .foreachBatch(batch_handler) .start() .awaitTermination())
where the simplified batch_handler function is a stand-in for the stateful transformation + writer to the actual data sink. Also for simplicity I am using a local folder as checkpoint location.
This works fine as far as checkpointing of Kafka offsets is concerned. But how can I include the state of my custom batch handler (counter in my simplified example) in the checkpoints such that the job can pick up where it left after a crash?
The Spark Structured Streaming Guide doesn't say anything on the topic. With the foreach sink I can pass a custom row handler object but this seems to support only open, process, and close methods.
Would it make sense to create a "Request" or even "Feature" ticket to enhance this with methods for restoring state from a checkpoint and exporting state to support checkpointing?
PS: I have posted this on SOF, too. If anyone cares to answer or comment I'd be happy to upvote their post.