Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-22276

Unnecessary repartitioning

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Duplicate
    • 2.2.0
    • None
    • Optimizer, SQL
    • None

    Description

      When a dataframe is sorted it is partitioned with a RangePartitioner.
      If later we aggregate by the exact same fields over which sort was applied there is a new (apparently useless) Exchange repartitioning by a HashPartitioner.
      In my use case the groupBy exchange is still very costly as the aggregate function won't reduce the data volume.

      Is there any reason why groupBy always shuffles data, or could this be improved?
      Is there currently a way to workaround for the moment, without going to mapPartitions?

      Example

      nrn_vals.printSchema()
      (nrn_vals
       .sort("post_gid")
       .groupBy("post_gid")
       .agg(F.collect_list("pre_gid").alias("pre_gids"))
       ).explain()
      

      Outputs the following

      root
       |-- pre_gid: integer (nullable = true)
       |-- post_gid: integer (nullable = true)
       |-- floatvec: array (nullable = false)
       |    |-- element: float (containsNull = true)
      
      == Physical Plan ==
      ObjectHashAggregate(keys=[post_gid#1386], functions=[collect_list(pre_gid#1385, 0, 0)])
      +- Exchange hashpartitioning(post_gid#1386, 1)
         +- ObjectHashAggregate(keys=[post_gid#1386], functions=[partial_collect_list(pre_gid#1385, 0, 0)])
            +- *Sort [post_gid#1386 ASC NULLS FIRST], true, 0
               +- Exchange rangepartitioning(post_gid#1386 ASC NULLS FIRST, 1)
                  +- *FileScan parquet [pre_gid#1385,post_gid#1386] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/media/psf/Home/dev/Functionalizer/pyspark/spykfunc_output/extended_touche..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<pre_gid:int,post_gid:int>
      
      

      Attachments

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            Unassigned Unassigned
            ferdonline Fernando Pereira
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment