Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-20065

Empty output files created for aggregation query in append mode

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 2.1.0
    • Fix Version/s: 2.3.0
    • Component/s: Structured Streaming
    • Labels:
      None
    • Target Version/s:

      Description

      I've got a Kafka topic which I'm querying, running a windowed aggregation, with a 30 second watermark, 10 second trigger, writing out to Parquet with append output mode.

      Every 10 second trigger generates a file, regardless of whether there was any data for that trigger, or whether any records were actually finalized by the watermark.

      Is this expected behavior or should it not write out these empty files?

      val df = spark.readStream.format("kafka")....
      
      val query = df
        .withWatermark("timestamp", "30 seconds")
        .groupBy(window($"timestamp", "10 seconds"))
        .count()
        .select(date_format($"window.start", "HH:mm:ss").as("time"), $"count")
      
      query
        .writeStream
        .format("parquet")
        .option("checkpointLocation", aggChk)
        .trigger(ProcessingTime("10 seconds"))
        .outputMode("append")
        .start(aggPath)
      

      As the query executes, do a file listing on "aggPath" and you'll see 339 byte files at a minimum until we arrive at the first watermark and the initial batch is finalized. Even after that though, as there are empty batches it'll keep generating empty files every trigger.

        Attachments

          Activity

            People

            • Assignee:
              XuanYuan Yuanjian Li
              Reporter:
              sfiorito Silvio Fiorito
            • Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: