Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-22481

CatalogImpl.refreshTable is slow

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • 2.1.1, 2.1.2, 2.2.0
    • None
    • SQL

    Description

      CatalogImpl.refreshTable was updated in 2.1.1 and since than it has become really slow.
      The cause of the issue is that it is now always creates a dataset, and this is redundant most of the time, we only need the dataset if the table is cached.

      before 2.1.1:
      override def refreshTable(tableName: String): Unit = {
      val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
      // Temp tables: refresh (or invalidate) any metadata/data cached in the plan recursively.
      // Non-temp tables: refresh the metadata cache.
      sessionCatalog.refreshTable(tableIdent)

      // If this table is cached as an InMemoryRelation, drop the original
      // cached version and make the new version cached lazily.
      val logicalPlan = sparkSession.sessionState.catalog.lookupRelation(tableIdent)
      // Use lookupCachedData directly since RefreshTable also takes databaseName.
      val isCached = sparkSession.sharedState.cacheManager.lookupCachedData(logicalPlan).nonEmpty
      if (isCached) {
      // Create a data frame to represent the table.
      // TODO: Use uncacheTable once it supports database name.
      val df = Dataset.ofRows(sparkSession, logicalPlan)
      // Uncache the logicalPlan.
      sparkSession.sharedState.cacheManager.uncacheQuery(df, blocking = true)
      // Cache it again.
      sparkSession.sharedState.cacheManager.cacheQuery(df, Some(tableIdent.table))
      }
      }

      after 2.1.1:
      override def refreshTable(tableName: String): Unit = {
      val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
      // Temp tables: refresh (or invalidate) any metadata/data cached in the plan recursively.
      // Non-temp tables: refresh the metadata cache.
      sessionCatalog.refreshTable(tableIdent)

      // If this table is cached as an InMemoryRelation, drop the original
      // cached version and make the new version cached lazily.
      val table = sparkSession.table(tableIdent)
      if (isCached(table))

      { // Uncache the logicalPlan. sparkSession.sharedState.cacheManager.uncacheQuery(table, blocking = true) // Cache it again. sparkSession.sharedState.cacheManager.cacheQuery(table, Some(tableIdent.table)) }

      }

      Attachments

        Activity

          People

            Unassigned Unassigned
            ran.haim@optimalplus.com Ran Haim
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: