Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-19965

DataFrame batch reader may fail to infer partitions when reading FileStreamSink's output

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.1.0
    • 2.2.0
    • Structured Streaming
    • None

    Description

      Reproducer

        test("partitioned writing and batch reading with 'basePath'") {
          val inputData = MemoryStream[Int]
          val ds = inputData.toDS()
      
          val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath
          val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath
      
          var query: StreamingQuery = null
      
          try {
            query =
              ds.map(i => (i, i * 1000))
                .toDF("id", "value")
                .writeStream
                .partitionBy("id")
                .option("checkpointLocation", checkpointDir)
                .format("parquet")
                .start(outputDir)
      
            inputData.addData(1, 2, 3)
            failAfter(streamingTimeout) {
              query.processAllAvailable()
            }
      
            spark.read.option("basePath", outputDir).parquet(outputDir + "/*").show()
          } finally {
            if (query != null) {
              query.stop()
            }
          }
        }
      

      Stack trace

      [info] - partitioned writing and batch reading with 'basePath' *** FAILED *** (3 seconds, 928 milliseconds)
      [info]   java.lang.AssertionError: assertion failed: Conflicting directory structures detected. Suspicious paths:
      [info] 	***/stream.output-65e3fa45-595a-4d29-b3df-4c001e321637
      [info] 	***/stream.output-65e3fa45-595a-4d29-b3df-4c001e321637/_spark_metadata
      [info] 
      [info] If provided paths are partition directories, please set "basePath" in the options of the data source to specify the root directory of the table. If there are multiple root directories, please load them separately and then union them.
      [info]   at scala.Predef$.assert(Predef.scala:170)
      [info]   at org.apache.spark.sql.execution.datasources.PartitioningUtils$.parsePartitions(PartitioningUtils.scala:133)
      [info]   at org.apache.spark.sql.execution.datasources.PartitioningUtils$.parsePartitions(PartitioningUtils.scala:98)
      [info]   at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.inferPartitioning(PartitioningAwareFileIndex.scala:156)
      [info]   at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.partitionSpec(InMemoryFileIndex.scala:54)
      [info]   at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.partitionSchema(PartitioningAwareFileIndex.scala:55)
      [info]   at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:133)
      [info]   at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:361)
      [info]   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:160)
      [info]   at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:536)
      [info]   at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:520)
      [info]   at org.apache.spark.sql.streaming.FileStreamSinkSuite$$anonfun$8.apply$mcV$sp(FileStreamSinkSuite.scala:292)
      [info]   at org.apache.spark.sql.streaming.FileStreamSinkSuite$$anonfun$8.apply(FileStreamSinkSuite.scala:268)
      [info]   at org.apache.spark.sql.streaming.FileStreamSinkSuite$$anonfun$8.apply(FileStreamSinkSuite.scala:268)
      

      Attachments

        Activity

          People

            lwlin Liwei Lin
            zsxwing Shixiong Zhu
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: