Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-26450

Map of schema is built too frequently in some wide queries

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • 2.4.0
    • 3.0.0
    • SQL
    • None

    Description

      When executing queries with wide projections and wide schemas, Spark rebuilds an attribute map for the same schema many times.

      For example:

      select * from orctbl where id1 = 1
      

      Assume orctbl has 6000 columns and 34 files. In that case, the above query creates an AttributeSeq object 270,000 times[1]. Each AttributeSeq instantiation builds a map of the entire list of 6000 attributes (but not until lazy val exprIdToOrdinal is referenced).

      Whenever OrcFileFormat reads a new file, it generates a new unsafe projection. That results in this function getting called:

      protected def bind(in: Seq[Expression], inputSchema: Seq[Attribute]): Seq[Expression] =
          in.map(BindReferences.bindReference(_, inputSchema))
      

      For each column in the projection, this line calls bindReference. Each call passes inputSchema, a Sequence of Attributes, to a parameter position expecting an AttributeSeq. The compiler implicitly calls the constructor for AttributeSeq, which (lazily) builds a map for every attribute in the schema. Therefore, this function builds a map of the entire schema once for each column in the projection, and it does this for each input file. For the above example query, this accounts for 204K instantiations of AttributeSeq.

      Readers for CSV and JSON tables do something similar.

      In addition, ProjectExec also creates an unsafe projection for each task. As a result, this line gets called, which has the same issue:

        def toBoundExprs(exprs: Seq[Expression], inputSchema: Seq[Attribute]): Seq[Expression] = {
          exprs.map(BindReferences.bindReference(_, inputSchema))
        }
      

      The above affects all wide queries that have a projection node, regardless of the file reader. For the example query, ProjectExec accounts for the additional 66K instantiations of the AttributeSeq.

      Spark can save time by pre-building the AttributeSeq right before the map operations in bind and toBoundExprs. The time saved depends on size of schema, size of projection, number of input files (for Orc), number of file splits (for CSV, and JSON tables), and number of tasks.

      For a 6000 column CSV table with 500K records and 34 input files, the time savings is only 6%[1] because Spark doesn't create as many unsafe projections as compared to Orc tables.

      On the other hand, for a 6000 column Orc table with 500K records and 34 input files, the time savings is about 16%[1].

      [1] based on queries run in local mode with 8 executor threads on my laptop.

      Attachments

        Issue Links

          Activity

            People

              bersprockets Bruce Robbins
              bersprockets Bruce Robbins
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: