Uploaded image for project: 'Apache Sedona'
  1. Apache Sedona
  2. SEDONA-186

collecting result rows of a spatial join query with SELECT * fails with serde error

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 1.2.1
    • 1.3.0

    Description

      The Problem

      The SQL query reproduced this bug was:

      scala> spark.sql("SELECT * FROM osm_all_nodes_geom osm INNER JOIN ms_buildings_geom msb ON ST_Contains(msb.geom, osm.geom)").collect
      
      22/11/01 18:41:08 WARN JoinQuery: UseIndex is true, but no index exists. Will build index on the fly.
      java.lang.RuntimeException: Error while decoding: org.locationtech.jts.io.ParseException: Unknown WKB type 184
      createexternalrow(input[0, bigint, true], input[1, binary, true], newInstance(class org.apache.spark.sql.sedona_sql.UDT.GeometryUDT).deserialize, input[3, string, true].toString, input[4, string, true].toString, input[5, binary, true], newInstance(class org.apache.spark.sql.sedona_sql.UDT.GeometryUDT).deserialize, StructField(id,LongType,true), StructField(wkb,BinaryType,true), StructField(geom,GeometryUDT,true), StructField(location,StringType,true), StructField(quad_key,StringType,true), StructField(wkb,BinaryType,true), StructField(geom,GeometryUDT,true))
        at org.apache.spark.sql.errors.QueryExecutionErrors$.expressionDecodingError(QueryExecutionErrors.scala:1047)
        at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:184)
        at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:172)
        at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
        at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
        at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
        at scala.collection.TraversableLike.map(TraversableLike.scala:286)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
        at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
        at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3715)
        at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2971)
        at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3706)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
        at org.apache.spark.sql.Dataset.collect(Dataset.scala:2971)
        ... 47 elided
      Caused by: org.locationtech.jts.io.ParseException: Unknown WKB type 184
        at org.locationtech.jts.io.WKBReader.readGeometry(WKBReader.java:282)
        at org.locationtech.jts.io.WKBReader.read(WKBReader.java:191)
        at org.locationtech.jts.io.WKBReader.read(WKBReader.java:159)
        at org.apache.sedona.sql.utils.GeometrySerializer$.deserialize(GeometrySerializer.scala:49)
        at org.apache.spark.sql.sedona_sql.UDT.GeometryUDT.deserialize(GeometryUDT.scala:42)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.createExternalRow_0_0$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply(Unknown Source)
        at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:181)
        ... 65 more
      

      Where osm_all_nodes_geom is a DataFrame with 2 columns:

      scala> spark.sql("SELECT * FROM osm_all_nodes_geom").printSchema
      root
       |-- id: long (nullable = true)
       |-- wkb: binary (nullable = true)
       |-- geom: geometry (nullable = true)
      

      ms_buildings_geom is a DataFrame with 3 columns:

      scala> spark.sql("SELECT * FROM ms_buildings_geom").printSchema
      root
       |-- location: string (nullable = true)
       |-- quad_key: string (nullable = true)
       |-- wkb: binary (nullable = true)
       |-- geom: geometry (nullable = true)
      

      Other findings

      1. This problem only reproduces when running spatial join queries with SELECT *. If we SELECT some particular columns, the problem went away.
      2. .show runs perfectly fine, while collect or takeAsList runs into this problem.
      3. This problem cannot reproduced by running broadcast join, such as
        spark.sql("SELECT /*+ BROADCAST(msb) */ * FROM osm_all_nodes_geom osm INNER JOIN ms_buildings_geom msb ON ST_Contains(msb.geom, osm.geom)").collect
        

      Cause of this problem

      When the join query optimizer replaces a join node with RangeJoinExec or DistanceJoinExec node, left and right sub plans of the join may get swapped. The output of RangeJoinExec or DistanceJoinExec became rightOutput ++ leftOutput instead of leftOutput ++ rightOutput. This leads to column type inconsistency when deserializing the result rows of the join query.

      The join query optimizer never swaps the left and right subplan when constructing BroadcastIndexJoinExec, so broadcast join does not have this problem.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              kontinuation Kristin Cowalcijk
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 1h 40m
                  1h 40m