Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
2.1.0
-
None
Description
I've got a Kafka topic which I'm querying, running a windowed aggregation, with a 30 second watermark, 10 second trigger, writing out to Parquet with append output mode.
Every 10 second trigger generates a file, regardless of whether there was any data for that trigger, or whether any records were actually finalized by the watermark.
Is this expected behavior or should it not write out these empty files?
val df = spark.readStream.format("kafka").... val query = df .withWatermark("timestamp", "30 seconds") .groupBy(window($"timestamp", "10 seconds")) .count() .select(date_format($"window.start", "HH:mm:ss").as("time"), $"count") query .writeStream .format("parquet") .option("checkpointLocation", aggChk) .trigger(ProcessingTime("10 seconds")) .outputMode("append") .start(aggPath)
As the query executes, do a file listing on "aggPath" and you'll see 339 byte files at a minimum until we arrive at the first watermark and the initial batch is finalized. Even after that though, as there are empty batches it'll keep generating empty files every trigger.