Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-24410

Missing optimization for Union on bucketed tables

Log workAgile BoardRank to TopRank to BottomAttach filesAttach ScreenshotBulk Copy AttachmentsBulk Move AttachmentsVotersWatch issueWatchersCreate sub-taskConvert to sub-taskMoveLinkCloneLabelsUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete CommentsDelete
    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • 2.3.0
    • None
    • SQL

    Description

      A common use-case we have is of a partially aggregated table and daily increments that we need to further aggregate. we do this my unioning the two tables and aggregating again.
      we tried to optimize this process by bucketing the tables, but currently it seems that the union operator doesn't leverage the tables being bucketed (like the join operator).

      for example, for two bucketed tables a1,a2:

          sparkSession.range(N).selectExpr(
            "id as key",
            "id % 2 as t1",
            "id % 3 as t2")
              .repartition(col("key"))
              .write
            .mode(SaveMode.Overwrite)
              .bucketBy(3, "key")
              .sortBy("t1")
              .saveAsTable("a1")
      
          sparkSession.range(N).selectExpr(
            "id as key",
            "id % 2 as t1",
            "id % 3 as t2")
            .repartition(col("key"))
            .write.mode(SaveMode.Overwrite)
            .bucketBy(3, "key")
            .sortBy("t1")
            .saveAsTable("a2")
      
      

      for the join query we get the "SortMergeJoin"

      select * from a1 join a2 on (a1.key=a2.key)
      
      == Physical Plan ==
      *(3) SortMergeJoin [key#24L], [key#27L], Inner
      :- *(1) Sort [key#24L ASC NULLS FIRST], false, 0
      :  +- *(1) Project [key#24L, t1#25L, t2#26L]
      :     +- *(1) Filter isnotnull(key#24L)
      :        +- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,t1:bigint,t2:bigint>
      +- *(2) Sort [key#27L ASC NULLS FIRST], false, 0
         +- *(2) Project [key#27L, t1#28L, t2#29L]
            +- *(2) Filter isnotnull(key#27L)
               +- *(2) FileScan parquet default.a2[key#27L,t1#28L,t2#29L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,t1:bigint,t2:bigint>
      

      but for aggregation after union we get a shuffle:

      select key,count(*) from (select * from a1 union all select * from a2)z group by key
      
      == Physical Plan ==
      *(4) HashAggregate(keys=[key#25L], functions=[count(1)], output=[key#25L, count(1)#36L])
      +- Exchange hashpartitioning(key#25L, 1)
         +- *(3) HashAggregate(keys=[key#25L], functions=[partial_count(1)], output=[key#25L, count#38L])
            +- Union
               :- *(1) Project [key#25L]
               :  +- *(1) FileScan parquet default.a1[key#25L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<key:bigint>
               +- *(2) Project [key#28L]
                  +- *(2) FileScan parquet default.a2[key#28L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<key:bigint>
      

      Attachments

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            Unassigned Unassigned Assign to me
            uzadude Ohad Raviv
            Votes:
            0 Vote for this issue
            Watchers:
            9 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment