Details
-
Improvement
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
None
Description
Spark SQL primarily uses org.apache.spark.sql.parquet.row.metadata to infer the schema of parquet files. It will fall back to using the native parquet schema only when org.apache.spark.sql.parquet.row.metadata is absent.
Writing the schema of dataframes with GeometryUDT columns into org.apache.spark.sql.parquet.row.metadata may cause compatibility problems with older versions of Apache Sedona. Additionally, there will be a warning when reading such GeoParquet files using vallina Spark SQL:
>>> df = spark.read.format("parquet").load('/home/kontinuation/local/iceberg/test_geoparquet_points') 23/12/27 17:43:56 WARN ParquetFileFormat: Failed to parse and ignored serialized Spark schema in Parquet key-value metadata: {"type":"struct","fields":[{"name":"user_id","type":"long","nullable":true,"metadata":{}},{"name":"speed","type":"double","nullable":true,"metadata":{}},{"name":"geom","type":{"type":"udt","class":"org.apache.spark.sql.sedona_sql.UDT.GeometryUDT","pyClass":"sedona.sql.types.GeometryType","sqlType":"binary"},"nullable":true,"metadata":{}}]} java.lang.IllegalArgumentException: Unsupported dataType: {"type":"struct","fields":[{"name":"user_id","type":"long","nullable":true,"metadata":{}},{"name":"speed","type":"double","nullable":true,"metadata":{}},{"name":"geom","type":{"type":"udt","class":"org.apache.spark.sql.sedona_sql.UDT.GeometryUDT","pyClass":"sedona.sql.types.GeometryType","sqlType":"binary"},"nullable":true,"metadata":{}}]}, [1.1] failure: 'TimestampType' expected but '{' found {"type":"struct","fields":[{"name":"user_id","type":"long","nullable":true,"metadata":{}},{"name":"speed","type":"double","nullable":true,"metadata":{}},{"name":"geom","type":{"type":"udt","class":"org.apache.spark.sql.sedona_sql.UDT.GeometryUDT","pyClass":"sedona.sql.types.GeometryType","sqlType":"binary"},"nullable":true,"metadata":{}}]} ^ at org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser$.parseString(LegacyTypeStringParser.scala:90) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$deserializeSchemaString$2.applyOrElse(ParquetFileFormat.scala:521) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$deserializeSchemaString$2.applyOrElse(ParquetFileFormat.scala:516) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38) at scala.util.Failure.recover(Try.scala:234) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.deserializeSchemaString(ParquetFileFormat.scala:516) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$readSchemaFromFooter$1(ParquetFileFormat.scala:509) at scala.Option.flatMap(Option.scala:271) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.readSchemaFromFooter(ParquetFileFormat.scala:509) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$mergeSchemasInParallel$2(ParquetFileFormat.scala:491) at scala.collection.immutable.Stream.map(Stream.scala:418) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$mergeSchemasInParallel$1(ParquetFileFormat.scala:491) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$mergeSchemasInParallel$1$adapted(ParquetFileFormat.scala:484) at org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$2(SchemaMergeUtils.scala:75) at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:837) at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:837) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:463) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:466) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750)
We suggest changing the data types of geometry columns to binary for maximum compatibility.
Please refer to the discussion in OvertureMaps/data/issues/89 for the background of this proposal.
Attachments
Issue Links
- links to