Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-24850

Query plan string representation grows exponentially on queries with recursive cached datasets

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.4.0
    • 2.4.0
    • SQL
    • None

    Description

      As of https://github.com/apache/spark/pull/21018, InMemoryRelation includes its cacheBuilder when logging query plans. This CachedRDDBuilder includes the cachedPlan, so calling treeString on InMemoryRelation will log the cachedPlan in the cacheBuilder.

      Given the sample dataset:

      $ cat test.csv
      A,B
      0,0

      If the query plan has multiple cached datasets that depend on each other:

      var df_cached = spark.read.format("csv").option("header", "true").load("test.csv").cache()
      0 to 1 foreach { _ =>
      df_cached = df_cached.join(spark.read.format("csv").option("header", "true").load("test.csv"), "A").cache()
      }
      df_cached.explain
      
      

      results in:

      == Physical Plan ==
      InMemoryTableScan [A#10, B#11, B#35, B#87]
      +- InMemoryRelation [A#10, B#11, B#35, B#87], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(2) Project [A#10, B#11, B#35, B#87]
      +- *(2) BroadcastHashJoin [A#10], [A#86], Inner, BuildRight
      :- *(2) Filter isnotnull(A#10)
      : +- InMemoryTableScan [A#10, B#11, B#35], [isnotnull(A#10)]
      : +- InMemoryRelation [A#10, B#11, B#35], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(2) Project [A#10, B#11, B#35]
      +- *(2) BroadcastHashJoin [A#10], [A#34], Inner, BuildRight
      :- *(2) Filter isnotnull(A#10)
      : +- InMemoryTableScan [A#10, B#11], [isnotnull(A#10)]
      : +- InMemoryRelation [A#10, B#11], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
      ,None)
      : +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
      +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
      +- *(1) Filter isnotnull(A#34)
      +- InMemoryTableScan [A#34, B#35], [isnotnull(A#34)]
      +- InMemoryRelation [A#34, B#35], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
      ,None)
      +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
      ,None)
      : +- *(2) Project [A#10, B#11, B#35]
      : +- *(2) BroadcastHashJoin [A#10], [A#34], Inner, BuildRight
      : :- *(2) Filter isnotnull(A#10)
      : : +- InMemoryTableScan [A#10, B#11], [isnotnull(A#10)]
      : : +- InMemoryRelation [A#10, B#11], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
      ,None)
      : : +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
      : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
      : +- *(1) Filter isnotnull(A#34)
      : +- InMemoryTableScan [A#34, B#35], [isnotnull(A#34)]
      : +- InMemoryRelation [A#34, B#35], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
      ,None)
      : +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
      +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
      +- *(1) Filter isnotnull(A#86)
      +- InMemoryTableScan [A#86, B#87], [isnotnull(A#86)]
      +- InMemoryRelation [A#86, B#87], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
      ,None)
      +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
      ,None)
      +- *(2) Project [A#10, B#11, B#35, B#87]
      +- *(2) BroadcastHashJoin [A#10], [A#86], Inner, BuildRight
      :- *(2) Filter isnotnull(A#10)
      : +- InMemoryTableScan [A#10, B#11, B#35], [isnotnull(A#10)]
      : +- InMemoryRelation [A#10, B#11, B#35], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(2) Project [A#10, B#11, B#35]
      +- *(2) BroadcastHashJoin [A#10], [A#34], Inner, BuildRight
      :- *(2) Filter isnotnull(A#10)
      : +- InMemoryTableScan [A#10, B#11], [isnotnull(A#10)]
      : +- InMemoryRelation [A#10, B#11], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
      ,None)
      : +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
      +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
      +- *(1) Filter isnotnull(A#34)
      +- InMemoryTableScan [A#34, B#35], [isnotnull(A#34)]
      +- InMemoryRelation [A#34, B#35], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
      ,None)
      +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
      ,None)
      : +- *(2) Project [A#10, B#11, B#35]
      : +- *(2) BroadcastHashJoin [A#10], [A#34], Inner, BuildRight
      : :- *(2) Filter isnotnull(A#10)
      : : +- InMemoryTableScan [A#10, B#11], [isnotnull(A#10)]
      : : +- InMemoryRelation [A#10, B#11], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
      ,None)
      : : +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
      : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
      : +- *(1) Filter isnotnull(A#34)
      : +- InMemoryTableScan [A#34, B#35], [isnotnull(A#34)]
      : +- InMemoryRelation [A#34, B#35], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
      ,None)
      : +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
      +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
      +- *(1) Filter isnotnull(A#86)
      +- InMemoryTableScan [A#86, B#87], [isnotnull(A#86)]
      +- InMemoryRelation [A#86, B#87], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
      ,None)
      +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
      
      

      previously this yielded:

      == Physical Plan ==
      InMemoryTableScan [A#10, B#11, B#37, B#89]
      +- InMemoryRelation [A#10, B#11, B#37, B#89], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
      +- *(2) Project [A#10, B#11, B#37, B#89]
      +- *(2) BroadcastHashJoin [A#10], [A#88], Inner, BuildRight
      :- *(2) Filter isnotnull(A#10)
      : +- InMemoryTableScan [A#10, B#11, B#37], [isnotnull(A#10)]
      : +- InMemoryRelation [A#10, B#11, B#37], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
      : +- *(2) Project [A#10, B#11, B#37]
      : +- *(2) BroadcastHashJoin [A#10], [A#36], Inner, BuildRight
      : :- *(2) Filter isnotnull(A#10)
      : : +- InMemoryTableScan [A#10, B#11], [isnotnull(A#10)]
      : : +- InMemoryRelation [A#10, B#11], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
      : : +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
      : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
      : +- *(1) Filter isnotnull(A#36)
      : +- InMemoryTableScan [A#36, B#37], [isnotnull(A#36)]
      : +- InMemoryRelation [A#36, B#37], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
      : +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
      +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
      +- *(1) Filter isnotnull(A#88)
      +- InMemoryTableScan [A#88, B#89], [isnotnull(A#88)]
      +- InMemoryRelation [A#88, B#89], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
      +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
      
      

      This exponential growth can OOM the driver on large query plans with cached datasets.

      Attachments

        Activity

          People

            onursatici Onur Satici
            onursatici Onur Satici
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: