Description
saveAsTable is fine and seems we have successfully deleted the old data and written the new data. However, when reading the newly created table, an error will be thrown.
Error in SQL statement: java.lang.RuntimeException: java.lang.RuntimeException: could not merge metadata: key org.apache.spark.sql.parquet.row.metadata has conflicting values: at parquet.hadoop.api.InitContext.getMergedKeyValueMetaData(InitContext.java:67) at parquet.hadoop.api.ReadSupport.init(ReadSupport.java:84) at org.apache.spark.sql.parquet.FilteringParquetRowInputFormat.getSplits(ParquetTableOperations.scala:469) at parquet.hadoop.ParquetInputFormat.getSplits(ParquetInputFormat.java:245) at org.apache.spark.sql.parquet.ParquetRelation2$$anon$1.getPartitions(newParquet.scala:461) ...
If I set spark.sql.parquet.cacheMetadata to false, it's fine to query the data.
Note: the newly created table needs to have more than one file to trigger the bug (if there is only a single file, we will not need to merge metadata).
To reproduce it, try...
import org.apache.spark.sql.SaveMode import sqlContext._ sql("drop table if exists test") val df1 = sqlContext.jsonRDD(sc.parallelize((1 to 10).map(i => s"""{"a":$i}"""), 2)) // we will save to 2 parquet files. df1.saveAsTable("test", "parquet", SaveMode.Overwrite) sql("select * from test").collect.foreach(println) // Warm the FilteringParquetRowInputFormat.footerCache val df2 = sqlContext.jsonRDD(sc.parallelize((1 to 10).map(i => s"""{"b":$i}"""), 4)) // we will save to 4 parquet files. df2.saveAsTable("test", "parquet", SaveMode.Overwrite) sql("select * from test").collect.foreach(println)
For this example, we have two outdated footers for df1 in footerCache and since we have four parquet files for the new test table, we picked up 2 new footers for df2. Then, we hit the bug.
Attachments
Issue Links
- links to