Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-22898

collect_set aggregation on bucketed table causes an exchange stage

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Duplicate
    • Affects Version/s: 2.3.0
    • Fix Version/s: None
    • Component/s: Spark Core
    • Labels:

      Description

      I'm using Spark-2.2. I'm POCing Spark's bucketing. I've created a bucketed table, here's the desc formatted my_bucketed_tbl output:

      -------------------------------------------

      col_nam data_type comment

      -------------------------------------------

      bundle string null
      ifa string null
      date_ date null
      hour int null
           
      1. Detailed Table ...
         
      Database default  
      Table my_bucketed_tbl
      Owner zeppelin  
      Created Thu Dec 21 13:43:...  
      Last Access Thu Jan 01 00:00:...  
      Type EXTERNAL  
      Provider orc  
      Num Buckets 16  
      Bucket Columns [`ifa`]  
      Sort Columns [`ifa`]  
      Table Properties [transient_lastDd...  
      Location hdfs:/user/hive/w...  
      Serde Library org.apache.hadoop...  
      InputFormat org.apache.hadoop...  
      OutputFormat org.apache.hadoop...  
      Storage Properties [serialization.fo...  

      -------------------------------------------

      When I'm executing an explain of a group by query, I can see that we've spared the exchange phase :

      sql("select ifa,max(bundle) from my_bucketed_tbl group by ifa").explain
      
      == Physical Plan ==
      SortAggregate(key=[ifa#932], functions=[max(bundle#920)])
      +- SortAggregate(key=[ifa#932], functions=[partial_max(bundle#920)])
         +- *Sort [ifa#932 ASC NULLS FIRST], false, 0
            +- *FileScan orc default.level_1[bundle#920,ifa#932] Batched: false, Format: ORC, Location: InMemoryFileIndex[hdfs://ip-10-44-9-73.ec2.internal:8020/user/hive/warehouse/level_1/date_=2017-1..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<bundle:string,ifa:string>
      

      But, when I replace Spark's max function with collect_set, I can see that the execution plan is the same as a non-bucketed table, means, the exchange phase is not spared :

      sql("select ifa,collect_set(bundle) from my_bucketed_tbl group by ifa").explain
      
      == Physical Plan ==
      ObjectHashAggregate(keys=[ifa#1010], functions=[collect_set(bundle#998, 0, 0)])
      +- Exchange hashpartitioning(ifa#1010, 200)
         +- ObjectHashAggregate(keys=[ifa#1010], functions=[partial_collect_set(bundle#998, 0, 0)])
            +- *FileScan orc default.level_1[bundle#998,ifa#1010] Batched: false, Format: ORC, Location: InMemoryFileIndex[hdfs://ip-10-44-9-73.ec2.internal:8020/user/hive/warehouse/level_1/date_=2017-1..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<bundle:string,ifa:string
      

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              modi.tamam@gmail.com Modi Tamam
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: