Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
2.1.0
-
None
Description
Reproducer
test("partitioned writing and batch reading with 'basePath'") { val inputData = MemoryStream[Int] val ds = inputData.toDS() val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath var query: StreamingQuery = null try { query = ds.map(i => (i, i * 1000)) .toDF("id", "value") .writeStream .partitionBy("id") .option("checkpointLocation", checkpointDir) .format("parquet") .start(outputDir) inputData.addData(1, 2, 3) failAfter(streamingTimeout) { query.processAllAvailable() } spark.read.option("basePath", outputDir).parquet(outputDir + "/*").show() } finally { if (query != null) { query.stop() } } }
Stack trace
[info] - partitioned writing and batch reading with 'basePath' *** FAILED *** (3 seconds, 928 milliseconds) [info] java.lang.AssertionError: assertion failed: Conflicting directory structures detected. Suspicious paths: [info] ***/stream.output-65e3fa45-595a-4d29-b3df-4c001e321637 [info] ***/stream.output-65e3fa45-595a-4d29-b3df-4c001e321637/_spark_metadata [info] [info] If provided paths are partition directories, please set "basePath" in the options of the data source to specify the root directory of the table. If there are multiple root directories, please load them separately and then union them. [info] at scala.Predef$.assert(Predef.scala:170) [info] at org.apache.spark.sql.execution.datasources.PartitioningUtils$.parsePartitions(PartitioningUtils.scala:133) [info] at org.apache.spark.sql.execution.datasources.PartitioningUtils$.parsePartitions(PartitioningUtils.scala:98) [info] at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.inferPartitioning(PartitioningAwareFileIndex.scala:156) [info] at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.partitionSpec(InMemoryFileIndex.scala:54) [info] at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.partitionSchema(PartitioningAwareFileIndex.scala:55) [info] at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:133) [info] at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:361) [info] at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:160) [info] at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:536) [info] at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:520) [info] at org.apache.spark.sql.streaming.FileStreamSinkSuite$$anonfun$8.apply$mcV$sp(FileStreamSinkSuite.scala:292) [info] at org.apache.spark.sql.streaming.FileStreamSinkSuite$$anonfun$8.apply(FileStreamSinkSuite.scala:268) [info] at org.apache.spark.sql.streaming.FileStreamSinkSuite$$anonfun$8.apply(FileStreamSinkSuite.scala:268)