Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-25196

Extends the analyze column command for cached tables

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • New Feature
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.0.0
    • 3.0.0
    • SQL
    • None

    Description

      In common usecases, users read catalog table data, join/aggregate them, and then cache the result for following reuse. Since we are only allowed to analyze column statistics in catalog tables via ANALYZE commands, the optimization depends on non-existing or inaccurate column statistics of cached data. So, I think it'd be nice if Spark could analyze cached data and hold temporary column statistics for InMemoryRelation.

      For example, we might be able to add a new API (e.g., analyzeColumnCacheQuery) to do so in CacheManager;
      POC: https://github.com/apache/spark/compare/master...maropu:AnalyzeCacheQuery

      scala> sql("SET spark.sql.cbo.enabled=true")
      scala> sql("SET spark.sql.statistics.histogram.enabled=true")
      scala> spark.range(1000).selectExpr("id % 33 AS c0", "rand() AS c1", "0 AS c2").write.saveAsTable("t")
      scala> sql("ANALYZE TABLE t COMPUTE STATISTICS FOR COLUMNS c0, c1, c2")
      scala> val cacheManager = spark.sharedState.cacheManager
      scala> def printColumnStats(data: org.apache.spark.sql.DataFrame) = {
           |   data.queryExecution.optimizedPlan.stats.attributeStats.foreach {
           |     case (k, v) => println(s"[$k]: $v")
           |   }
           | }
      scala> def df() = spark.table("t").groupBy("c0").agg(count("c1").as("v1"), sum("c2").as("v2"))
      
      // Prints column statistics in catalog table `t`
      scala> printColumnStats(spark.table("t"))
      [c0#7073L]: ColumnStat(Some(33),Some(0),Some(32),Some(0),Some(8),Some(8),Some(Histogram(3.937007874015748,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;@209c0be5)))
      [c1#7074]: ColumnStat(Some(997),Some(5.958619423369615E-4),Some(0.9988009488973438),Some(0),Some(8),Some(8),Some(Histogram(3.937007874015748,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;@4ef69c53)))
      [c2#7075]: ColumnStat(Some(1),Some(0),Some(0),Some(0),Some(4),Some(4),Some(Histogram(3.937007874015748,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;@7cbaf548)))
      
      // Prints column statistics on query result `df`
      scala> printColumnStats(df())
      [c0#7073L]: ColumnStat(Some(33),Some(0),Some(32),Some(0),Some(8),Some(8),Some(Histogram(3.937007874015748,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;@209c0be5)))
      
      // Prints column statistics on cached data of `df`
      scala> printColumnStats(df().cache)
      <No Column Statistics>
      
      // A new API described above
      scala> cacheManager.analyzeColumnCacheQuery(df(), "v1" :: "v2" :: Nil)
                                                                                      
      // Then, prints again
      scala> printColumnStats(df())
      [v1#7101L]: ColumnStat(Some(2),Some(30),Some(31),Some(0),Some(8),Some(8),Some(Histogram(0.12992125984251968,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;@e2ff893)))
      [v2#7103L]: ColumnStat(Some(1),Some(0),Some(0),Some(0),Some(8),Some(8),Some(Histogram(0.12992125984251968,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;@1498a4d)))
      
      scala> cacheManager.analyzeColumnCacheQuery(df(), "c0" :: Nil)
      scala> printColumnStats(df())
      [v1#7101L]: ColumnStat(Some(2),Some(30),Some(31),Some(0),Some(8),Some(8),Some(Histogram(0.12992125984251968,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;@e2ff893)))
      [v2#7103L]: ColumnStat(Some(1),Some(0),Some(0),Some(0),Some(8),Some(8),Some(Histogram(0.12992125984251968,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;@1498a4d)))
      [c0#7073L]: ColumnStat(Some(33),Some(0),Some(32),Some(0),Some(8),Some(8),Some(Histogram(0.12992125984251968,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;@626bcfc8)))
      

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            maropu Takeshi Yamamuro
            maropu Takeshi Yamamuro
            Votes:
            0 Vote for this issue
            Watchers:
            8 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment