Uploaded image for project: 'Apache Sedona'
  1. Apache Sedona
  2. SEDONA-60

Failing join if one side has only one row

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • 1.1.0
    • None

    Description

      We came across an issue when trying to join a one-row dataframe with another (with more rows).

      Example:

      left = spark.sql("SELECT St_PolygonFromText('1,1,1,0,0,0,0,1,1,1', ',') as polygon")
      
      right = spark.createDataFrame([(x,) for x in range(100)], schema=["x"]).selectExpr("St_Point(x,x) as point")
      
      left.createOrReplaceTempView("left")
      right.createOrReplaceTempView("right")
      
      spark.sql("SELECT * FROM left JOIN right ON ST_Intersects(left.polygon, right.point)").count()
      

      Produces:

      Py4JJavaError: An error occurred while calling o924.count. : org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree: ShuffleQueryStage 0, Statistics(sizeInBytes=1.06E+37 B) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#477333] +- *(3) HashAggregate(keys=[], functions=[partial_count(1) AS count#1039632L], output=[count#1039632L]) +- *(3) Project +- RangeJoin polygon#1039617: geometry, point#1039621: geometry, true :- Project [st_polygonfromtext(1,1,1,0,0,0,0,1,1,1, ,) AS polygon#1039617] : +- *(1) Scan OneRowRelation[] +- Project [st_point(x#1039619L, x#1039619L) AS point#1039621] +- *(2) Scan ExistingRDD[x#1039619L] at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56) at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.doMaterialize(QueryStageExec.scala:192) at org.apache.spark.sql.execution.adaptive.QueryStageExec.$anonfun$materialize$1(QueryStageExec.scala:87) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:240) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:236) at org.apache.spark.sql.execution.adaptive.QueryStageExec.materialize(QueryStageExec.scala:87) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$4(AdaptiveSparkPlanExec.scala:286) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$4$adapted(AdaptiveSparkPlanExec.scala:284) at scala.collection.immutable.List.foreach(List.scala:392) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:284) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:852) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:270) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.doExecute(AdaptiveSparkPlanExec.scala:372) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:196) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:240) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:236) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:192) at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:79) at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:88) at org.apache.spark.sql.execution.collect.InternalRowFormat$.collect(cachedSparkResults.scala:75) at org.apache.spark.sql.execution.collect.InternalRowFormat$.collect(cachedSparkResults.scala:62) at org.apache.spark.sql.execution.ResultCacheManager.$anonfun$getOrComputeResultInternal$1(ResultCacheManager.scala:496) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResultInternal(ResultCacheManager.scala:495) at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:399) at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:374) at org.apache.spark.sql.execution.SparkPlan.executeCollectResult(SparkPlan.scala:389) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:365) at org.apache.spark.sql.Dataset.$anonfun$count$1(Dataset.scala:3062) at org.apache.spark.sql.Dataset.$anonfun$count$1$adapted(Dataset.scala:3061) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3789) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$5(SQLExecution.scala:126) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:267) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:104) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:852) at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:77) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:217) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3787) at org.apache.spark.sql.Dataset.count(Dataset.scala:3061) at sun.reflect.GeneratedMethodAccessor616.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380) at py4j.Gateway.invoke(Gateway.java:295) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:251) at java.lang.Thread.run(Thread.java:748)
      Caused by: java.lang.IllegalArgumentException: Number of partitions must be >= 0 at org.apache.sedona.core.spatialRDD.SpatialRDD.spatialPartitioning(SpatialRDD.java:216) at org.apache.spark.sql.sedona_sql.strategy.join.TraitJoinQueryBase.doSpatialPartitioning(TraitJoinQueryBase.scala:72) at org.apache.spark.sql.sedona_sql.strategy.join.TraitJoinQueryBase.doSpatialPartitioning$(TraitJoinQueryBase.scala:70) at org.apache.spark.sql.sedona_sql.strategy.join.RangeJoinExec.doSpatialPartitioning(RangeJoinExec.scala:37) at org.apache.spark.sql.sedona_sql.strategy.join.TraitJoinQueryExec.doExecute(TraitJoinQueryExec.scala:111) at org.apache.spark.sql.sedona_sql.strategy.join.TraitJoinQueryExec.doExecute$(TraitJoinQueryExec.scala:52) at org.apache.spark.sql.sedona_sql.strategy.join.RangeJoinExec.doExecute(RangeJoinExec.scala:37) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:196) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:240) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:236) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:192) at org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:526) at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:454) at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:453) at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:497) at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:49) at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:197) at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:745) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:196) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:240) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:236) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:192) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD$lzycompute(ShuffleExchangeExec.scala:137) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD(ShuffleExchangeExec.scala:137) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doMapOutputStatisticsFuture$lzycompute(ShuffleExchangeExec.scala:142) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doMapOutputStatisticsFuture(ShuffleExchangeExec.scala:141) at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.$anonfun$mapOutputStatisticsFuture$1(ShuffleExchangeExec.scala:68) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:240) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:236) at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.mapOutputStatisticsFuture(ShuffleExchangeExec.scala:68) at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.mapOutputStatisticsFuture$(ShuffleExchangeExec.scala:67) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture(ShuffleExchangeExec.scala:118) at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.$anonfun$doMaterialize$1(QueryStageExec.scala:192) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52) ... 50 more

       

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              sebastian_eckweiler Sebastian Eckweiler
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: