Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-22599

Avoid extra reading for cached table

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • 2.2.0
    • None
    • SQL

    Description

      In the current implementation of Spark, InMemoryTableExec read all data in a cached table, filter CachedBatch according to stats and pass data to the downstream operators. This implementation makes it inefficient to reside the whole table in memory to serve various queries against different partitions of the table, which occupies a certain portion of our users' scenarios.

      The following is an example of such a use case:

      store_sales is a 1TB-sized table in cloud storage, which is partitioned by 'location'. The first query, Q1, wants to output several metrics A, B, C for all stores in all locations. After that, a small team of 3 data scientists wants to do some causal analysis for the sales in different locations. To avoid unnecessary I/O and parquet/orc parsing overhead, they want to cache the whole table in memory in Q1.

      With the current implementation, even any one of the data scientists is only interested in one out of three locations, the queries they submit to Spark cluster is still reading 1TB data completely.

      The reason behind the extra reading operation is that we implement CachedBatch as

      case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow)
      

      where "stats" is a part of every CachedBatch, so we can only filter batches for output of InMemoryTableExec operator by reading all data in in-memory table as input. The extra reading would be even more unacceptable when some of the table's data is evicted to disks.

      We propose to introduce a new type of block, metadata block, for the partitions of RDD representing data in the cached table. Every metadata block contains stats info for all columns in a partition and is saved to BlockManager when executing compute() method for the partition. To minimize the number of bytes to read,

      More details can be found in design doc:https://docs.google.com/document/d/1DSiP3ej7Wd2cWUPVrgqAtvxbSlu5_1ZZB6m_2t8_95Q/edit?usp=sharing

      performance test results:

      Environment: 6 Executors, each of which has 16 cores 90G memory

      dataset: 1T TPCDS data

      queries: tested 4 queries (Q19, Q46, Q34, Q27) in https://github.com/databricks/spark-sql-perf/blob/c2224f37e50628c5c8691be69414ec7f5a3d919a/src/main/scala/com/databricks/spark/sql/perf/tpcds/ImpalaKitQueries.scala

      results: https://docs.google.com/spreadsheets/d/1A20LxqZzAxMjW7ptAJZF4hMBaHxKGk3TBEQoAJXfzCI/edit?usp=sharing

      Attachments

        Activity

          People

            Unassigned Unassigned
            codingcat Nan Zhu
            Votes:
            0 Vote for this issue
            Watchers:
            8 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: