Description
spark.range(100000000L).selectExpr("id as a", "id as b").write.saveAsTable("t1") sql( """ |WITH base | AS (select *, ROW_NUMBER() OVER(ORDER BY a) AS new_a FROM t1) |SELECT * FROM base t1 JOIN base t2 ON t1.a = t2.b |""".stripMargin).explain()
The output:
== Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- SortMergeJoin [a#10L], [b#26L], Inner :- Filter isnotnull(a#10L) : +- Window [row_number() windowspecdefinition(a#10L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS new_a#8], [a#10L ASC NULLS FIRST] : +- Sort [a#10L ASC NULLS FIRST], false, 0 : +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=50] : +- FileScan parquet spark_catalog.default.t1[a#10L,b#11L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/yumwang/opensource/spark/spark-warehouse/org.apache.spark...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:bigint,b:bigint> +- Sort [b#26L ASC NULLS FIRST], false, 0 +- Filter isnotnull(b#26L) +- Window [row_number() windowspecdefinition(a#25L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS new_a#27], [a#25L ASC NULLS FIRST] +- Sort [a#25L ASC NULLS FIRST], false, 0 +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=54] +- FileScan parquet spark_catalog.default.t1[a#25L,b#26L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/yumwang/opensource/spark/spark-warehouse/org.apache.spark...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:bigint,b:bigint>