Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-20065

Empty output files created for aggregation query in append mode

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.1.0
    • 2.3.0
    • Structured Streaming
    • None

    Description

      I've got a Kafka topic which I'm querying, running a windowed aggregation, with a 30 second watermark, 10 second trigger, writing out to Parquet with append output mode.

      Every 10 second trigger generates a file, regardless of whether there was any data for that trigger, or whether any records were actually finalized by the watermark.

      Is this expected behavior or should it not write out these empty files?

      val df = spark.readStream.format("kafka")....
      
      val query = df
        .withWatermark("timestamp", "30 seconds")
        .groupBy(window($"timestamp", "10 seconds"))
        .count()
        .select(date_format($"window.start", "HH:mm:ss").as("time"), $"count")
      
      query
        .writeStream
        .format("parquet")
        .option("checkpointLocation", aggChk)
        .trigger(ProcessingTime("10 seconds"))
        .outputMode("append")
        .start(aggPath)
      

      As the query executes, do a file listing on "aggPath" and you'll see 339 byte files at a minimum until we arrive at the first watermark and the initial batch is finalized. Even after that though, as there are empty batches it'll keep generating empty files every trigger.

      Attachments

        Activity

          People

            XuanYuan Yuanjian Li
            sfiorito Silvio Fiorito
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: