Uploaded image for project: 'Apache Sedona'
  1. Apache Sedona
  2. SEDONA-457

Don't write GeometryUDT into org.apache.spark.sql.parquet.row.metadata when writing GeoParquet files

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • 1.5.1

    Description

      Spark SQL primarily uses org.apache.spark.sql.parquet.row.metadata to infer the schema of parquet files. It will fall back to using the native parquet schema only when org.apache.spark.sql.parquet.row.metadata is absent.

      Writing the schema of dataframes with GeometryUDT columns into org.apache.spark.sql.parquet.row.metadata may cause compatibility problems with older versions of Apache Sedona. Additionally, there will be a warning when reading such GeoParquet files using vallina Spark SQL:

      >>> df = spark.read.format("parquet").load('/home/kontinuation/local/iceberg/test_geoparquet_points')
      23/12/27 17:43:56 WARN ParquetFileFormat: Failed to parse and ignored serialized Spark schema in Parquet key-value metadata:
      	{"type":"struct","fields":[{"name":"user_id","type":"long","nullable":true,"metadata":{}},{"name":"speed","type":"double","nullable":true,"metadata":{}},{"name":"geom","type":{"type":"udt","class":"org.apache.spark.sql.sedona_sql.UDT.GeometryUDT","pyClass":"sedona.sql.types.GeometryType","sqlType":"binary"},"nullable":true,"metadata":{}}]}
      java.lang.IllegalArgumentException: Unsupported dataType: {"type":"struct","fields":[{"name":"user_id","type":"long","nullable":true,"metadata":{}},{"name":"speed","type":"double","nullable":true,"metadata":{}},{"name":"geom","type":{"type":"udt","class":"org.apache.spark.sql.sedona_sql.UDT.GeometryUDT","pyClass":"sedona.sql.types.GeometryType","sqlType":"binary"},"nullable":true,"metadata":{}}]}, [1.1] failure: 'TimestampType' expected but '{' found
      
      {"type":"struct","fields":[{"name":"user_id","type":"long","nullable":true,"metadata":{}},{"name":"speed","type":"double","nullable":true,"metadata":{}},{"name":"geom","type":{"type":"udt","class":"org.apache.spark.sql.sedona_sql.UDT.GeometryUDT","pyClass":"sedona.sql.types.GeometryType","sqlType":"binary"},"nullable":true,"metadata":{}}]}
      ^
      	at org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser$.parseString(LegacyTypeStringParser.scala:90)
      	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$deserializeSchemaString$2.applyOrElse(ParquetFileFormat.scala:521)
      	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$deserializeSchemaString$2.applyOrElse(ParquetFileFormat.scala:516)
      	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
      	at scala.util.Failure.recover(Try.scala:234)
      	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.deserializeSchemaString(ParquetFileFormat.scala:516)
      	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$readSchemaFromFooter$1(ParquetFileFormat.scala:509)
      	at scala.Option.flatMap(Option.scala:271)
      	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.readSchemaFromFooter(ParquetFileFormat.scala:509)
      	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$mergeSchemasInParallel$2(ParquetFileFormat.scala:491)
      	at scala.collection.immutable.Stream.map(Stream.scala:418)
      	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$mergeSchemasInParallel$1(ParquetFileFormat.scala:491)
      	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$mergeSchemasInParallel$1$adapted(ParquetFileFormat.scala:484)
      	at org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$2(SchemaMergeUtils.scala:75)
      	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:837)
      	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:837)
      	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
      	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
      	at org.apache.spark.scheduler.Task.run(Task.scala:127)
      	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:463)
      	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
      	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:466)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      	at java.lang.Thread.run(Thread.java:750)
      

      We suggest changing the data types of geometry columns to binary for maximum compatibility.

      Please refer to the discussion in OvertureMaps/data/issues/89 for the background of this proposal.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              kontinuation Kristin Cowalcijk
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 0.5h
                  0.5h