Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-24528

Add support to read multiple sorted bucket files for data source v1

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: In Progress
    • Major
    • Resolution: Unresolved
    • 3.1.0
    • None
    • SQL
    • None

    Description

      https://issues.apache.org/jira/browse/SPARK-24528#Closely related to  SPARK-24410, we're trying to optimize a very common use case we have of getting the most updated row by id from a fact table.

      We're saving the table bucketed to skip the shuffle stage, but we're still "waste" time on the Sort operator evethough the data is already sorted.

      here's a good example:

      sparkSession.range(N).selectExpr(
        "id as key",
        "id % 2 as t1",
        "id % 3 as t2")
          .repartition(col("key"))
          .write
        .mode(SaveMode.Overwrite)
          .bucketBy(3, "key")
          .sortBy("key", "t1")
          .saveAsTable("a1")
      sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain
      
      == Physical Plan ==
      SortAggregate(key=[key#24L], functions=[max(named_struct(t1, t1#25L, key, key#24L, t1, t1#25L, t2, t2#26L))])
      +- SortAggregate(key=[key#24L], functions=[partial_max(named_struct(t1, t1#25L, key, key#24L, t1, t1#25L, t2, t2#26L))])
      +- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: true, Format: Parquet, Location: ...

       

      and here's a bad example, but more realistic:

      sparkSession.sql("set spark.sql.shuffle.partitions=2")
      sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain
      
      == Physical Plan ==
      SortAggregate(key=[key#32L], functions=[max(named_struct(t1, t1#33L, key, key#32L, t1, t1#33L, t2, t2#34L))])
      +- SortAggregate(key=[key#32L], functions=[partial_max(named_struct(t1, t1#33L, key, key#32L, t1, t1#33L, t2, t2#34L))])
      +- *(1) Sort [key#32L ASC NULLS FIRST], false, 0
      +- *(1) FileScan parquet default.a1[key#32L,t1#33L,t2#34L] Batched: true, Format: Parquet, Location: ...
      
      

       

      I've traced the problem to DataSourceScanExec#235:

      val sortOrder = if (sortColumns.nonEmpty) {
        // In case of bucketing, its possible to have multiple files belonging to the
        // same bucket in a given relation. Each of these files are locally sorted
        // but those files combined together are not globally sorted. Given that,
        // the RDD partition will not be sorted even if the relation has sort columns set
        // Current solution is to check if all the buckets have a single file in it
      
        val files = selectedPartitions.flatMap(partition => partition.files)
        val bucketToFilesGrouping =
          files.map(_.getPath.getName).groupBy(file => BucketingUtils.getBucketId(file))
        val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 1)

      so obviously the code avoids dealing with this situation now..

      could you think of a way to solve this or bypass it?

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              uzadude Ohad Raviv
              Votes:
              3 Vote for this issue
              Watchers:
              15 Start watching this issue

              Dates

                Created:
                Updated: