Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
1.2.1
Description
The Problem
The SQL query reproduced this bug was:
scala> spark.sql("SELECT * FROM osm_all_nodes_geom osm INNER JOIN ms_buildings_geom msb ON ST_Contains(msb.geom, osm.geom)").collect 22/11/01 18:41:08 WARN JoinQuery: UseIndex is true, but no index exists. Will build index on the fly. java.lang.RuntimeException: Error while decoding: org.locationtech.jts.io.ParseException: Unknown WKB type 184 createexternalrow(input[0, bigint, true], input[1, binary, true], newInstance(class org.apache.spark.sql.sedona_sql.UDT.GeometryUDT).deserialize, input[3, string, true].toString, input[4, string, true].toString, input[5, binary, true], newInstance(class org.apache.spark.sql.sedona_sql.UDT.GeometryUDT).deserialize, StructField(id,LongType,true), StructField(wkb,BinaryType,true), StructField(geom,GeometryUDT,true), StructField(location,StringType,true), StructField(quad_key,StringType,true), StructField(wkb,BinaryType,true), StructField(geom,GeometryUDT,true)) at org.apache.spark.sql.errors.QueryExecutionErrors$.expressionDecodingError(QueryExecutionErrors.scala:1047) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:184) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:172) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at scala.collection.TraversableLike.map(TraversableLike.scala:286) at scala.collection.TraversableLike.map$(TraversableLike.scala:279) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198) at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3715) at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2971) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3706) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704) at org.apache.spark.sql.Dataset.collect(Dataset.scala:2971) ... 47 elided Caused by: org.locationtech.jts.io.ParseException: Unknown WKB type 184 at org.locationtech.jts.io.WKBReader.readGeometry(WKBReader.java:282) at org.locationtech.jts.io.WKBReader.read(WKBReader.java:191) at org.locationtech.jts.io.WKBReader.read(WKBReader.java:159) at org.apache.sedona.sql.utils.GeometrySerializer$.deserialize(GeometrySerializer.scala:49) at org.apache.spark.sql.sedona_sql.UDT.GeometryUDT.deserialize(GeometryUDT.scala:42) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.createExternalRow_0_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply(Unknown Source) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:181) ... 65 more
Where osm_all_nodes_geom is a DataFrame with 2 columns:
scala> spark.sql("SELECT * FROM osm_all_nodes_geom").printSchema root |-- id: long (nullable = true) |-- wkb: binary (nullable = true) |-- geom: geometry (nullable = true)
ms_buildings_geom is a DataFrame with 3 columns:
scala> spark.sql("SELECT * FROM ms_buildings_geom").printSchema root |-- location: string (nullable = true) |-- quad_key: string (nullable = true) |-- wkb: binary (nullable = true) |-- geom: geometry (nullable = true)
Other findings
- This problem only reproduces when running spatial join queries with SELECT *. If we SELECT some particular columns, the problem went away.
- .show runs perfectly fine, while collect or takeAsList runs into this problem.
- This problem cannot reproduced by running broadcast join, such as
spark.sql("SELECT /*+ BROADCAST(msb) */ * FROM osm_all_nodes_geom osm INNER JOIN ms_buildings_geom msb ON ST_Contains(msb.geom, osm.geom)").collect
Cause of this problem
When the join query optimizer replaces a join node with RangeJoinExec or DistanceJoinExec node, left and right sub plans of the join may get swapped. The output of RangeJoinExec or DistanceJoinExec became rightOutput ++ leftOutput instead of leftOutput ++ rightOutput. This leads to column type inconsistency when deserializing the result rows of the join query.
The join query optimizer never swaps the left and right subplan when constructing BroadcastIndexJoinExec, so broadcast join does not have this problem.
Attachments
Issue Links
- links to