Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
None
Description
Queries with several joins in a single stage leads to warning logs and possibly incorrect results. One way to trigger the error is to use the union operator.
joined_df = geo_df.alias("a").join(geo_df.alias("b"), f.expr("st_intersects(a.geom, b.geom)")) joined_df.union(joined_df).count()
Logs:
23/01/16 17:22:58 WARN JudgementBase: Didn't find partition extent for this partition: 8 23/01/16 17:22:58 WARN JudgementBase: Didn't find partition extent for this partition: 11 23/01/16 17:22:58 WARN JudgementBase: Didn't find partition extent for this partition: 12 ...
Partitioned joins in Sedona assumes that TaskContext.partitionId is the same as the grid id used for partitioning (JudgementBase::initPartition). That isn't true if Spark runs several joins in a single stage.
In the example above, if 10 partitions are used in each join, Spark will run the two joins in a single stage with 20 tasks. The second join will have partition id 10-19 instead of the expected 0-9. The second join could produce incorrect results. If the partition extent isn't found there is no deduplication. If it maps to the wrong extent it could eliminate rows that shouldn't be eliminated.
From spark-ui. Two joins in a single stage:
Attachments
Attachments
Issue Links
- links to