Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-28546

Why does the File Sink operation of Spark 2.4 Structured Streaming include double-level version validation?



    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Invalid
    • Affects Version/s: 2.4.0
    • Fix Version/s: None
    • Component/s: Structured Streaming
    • Labels:
    • Environment:

      Spark 2.4

      Structured Streaming


      My code is as follows:

      Dataset<Row> dataset = this.sparkSession.readStream().format("kafka")
       .option("kafka.bootstrap.servers", ",")
       .option("subscribe", "myTopic1,myTopic2")
       .option("startingOffsets", "earliest")
      String mdtTempView = "mybasetemp";
       ExpressionEncoder<Row> Rowencoder = this.getSchemaEncoder(new Schema.Parser().parse(baseschema.getValue())); 
       Dataset<Row> parseVal = dataset.select("value").as(Encoders.BINARY())
       .map(new MapFunction<Row>(){
       }, Rowencoder)
       Dataset<Row> queryResult = this.sparkSession.sql("select 。。。 from global_temp." + mdtTempView + " where start_time<>\"\"");
       String savePath= "/user/dx/streaming/data/testapp"; 
       String checkpointLocation= "/user/dx/streaming/checkpoint/testapp";
       StreamingQuery query = queryResult.writeStream().format("parquet")
       .option("path", savePath)
       .option("checkpointLocation", checkpointLocation)
       .partitionBy("month", "day", "hour")
       .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
      try {
       } catch (StreamingQueryException e) {


      1) When I first ran it, I found that app could run normally.

      2) Then, for some reason, I deleted the checkpoint directory of structured streaming and did not delete the savepath of sink file which saves HDFS files.

      3) Then restart app, at which time only executor was assigned after app started, and no tasks were assigned. In the log, I found the print message: "INFO streaming. FileStream Sink: Skipping already committed batch 72". Later I looked at the source code and found that the log was from https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala#L108

      4) The 3) situation lasts for several hours before the DAGScheduler is triggered to divide the DAG, submitStages, submitTasks, and tasks are assigned to the executor.

      Later, I read the https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala code carefully, and realized that in FileStreamSink, a log would be included under savepath/_spark_metadata, if the current batchId<=log. getLatest () will skip saving and output the log directly: logInfo (s "Skipping already committed batch $batchId").


      class FileStreamSink(
       sparkSession: SparkSession,
       path: String,
       fileFormat: FileFormat,
       partitionColumnNames: Seq[String],
       options: Map[String, String]) extends Sink with Logging {
       private val basePath = new Path(path)
       private val logPath = new Path(basePath, FileStreamSink.metadataDir)
       private val fileLog =
       new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, logPath.toUri.toString)
       override def addBatch(batchId: Long, data: DataFrame): Unit = {
         if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) {
           logInfo(s"Skipping already committed batch $batchId")
         } else {
           // save file to hdfs


      I think that since checkpoint is used, all information control rights should be given to checkpoint, and there should not be a batchId log information record.




            • Assignee:
              yy3b2007com tommy duan
            • Votes:
              0 Vote for this issue
              2 Start watching this issue


              • Created: