Description
As of https://github.com/apache/spark/pull/21018, InMemoryRelation includes its cacheBuilder when logging query plans. This CachedRDDBuilder includes the cachedPlan, so calling treeString on InMemoryRelation will log the cachedPlan in the cacheBuilder.
Given the sample dataset:
$ cat test.csv A,B 0,0
If the query plan has multiple cached datasets that depend on each other:
var df_cached = spark.read.format("csv").option("header", "true").load("test.csv").cache() 0 to 1 foreach { _ => df_cached = df_cached.join(spark.read.format("csv").option("header", "true").load("test.csv"), "A").cache() } df_cached.explain
results in:
== Physical Plan == InMemoryTableScan [A#10, B#11, B#35, B#87] +- InMemoryRelation [A#10, B#11, B#35, B#87], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(2) Project [A#10, B#11, B#35, B#87] +- *(2) BroadcastHashJoin [A#10], [A#86], Inner, BuildRight :- *(2) Filter isnotnull(A#10) : +- InMemoryTableScan [A#10, B#11, B#35], [isnotnull(A#10)] : +- InMemoryRelation [A#10, B#11, B#35], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(2) Project [A#10, B#11, B#35] +- *(2) BroadcastHashJoin [A#10], [A#34], Inner, BuildRight :- *(2) Filter isnotnull(A#10) : +- InMemoryTableScan [A#10, B#11], [isnotnull(A#10)] : +- InMemoryRelation [A#10, B#11], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string> ,None) : +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false])) +- *(1) Filter isnotnull(A#34) +- InMemoryTableScan [A#34, B#35], [isnotnull(A#34)] +- InMemoryRelation [A#34, B#35], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string> ,None) +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string> ,None) : +- *(2) Project [A#10, B#11, B#35] : +- *(2) BroadcastHashJoin [A#10], [A#34], Inner, BuildRight : :- *(2) Filter isnotnull(A#10) : : +- InMemoryTableScan [A#10, B#11], [isnotnull(A#10)] : : +- InMemoryRelation [A#10, B#11], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string> ,None) : : +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string> : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false])) : +- *(1) Filter isnotnull(A#34) : +- InMemoryTableScan [A#34, B#35], [isnotnull(A#34)] : +- InMemoryRelation [A#34, B#35], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string> ,None) : +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false])) +- *(1) Filter isnotnull(A#86) +- InMemoryTableScan [A#86, B#87], [isnotnull(A#86)] +- InMemoryRelation [A#86, B#87], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string> ,None) +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string> ,None) +- *(2) Project [A#10, B#11, B#35, B#87] +- *(2) BroadcastHashJoin [A#10], [A#86], Inner, BuildRight :- *(2) Filter isnotnull(A#10) : +- InMemoryTableScan [A#10, B#11, B#35], [isnotnull(A#10)] : +- InMemoryRelation [A#10, B#11, B#35], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(2) Project [A#10, B#11, B#35] +- *(2) BroadcastHashJoin [A#10], [A#34], Inner, BuildRight :- *(2) Filter isnotnull(A#10) : +- InMemoryTableScan [A#10, B#11], [isnotnull(A#10)] : +- InMemoryRelation [A#10, B#11], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string> ,None) : +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false])) +- *(1) Filter isnotnull(A#34) +- InMemoryTableScan [A#34, B#35], [isnotnull(A#34)] +- InMemoryRelation [A#34, B#35], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string> ,None) +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string> ,None) : +- *(2) Project [A#10, B#11, B#35] : +- *(2) BroadcastHashJoin [A#10], [A#34], Inner, BuildRight : :- *(2) Filter isnotnull(A#10) : : +- InMemoryTableScan [A#10, B#11], [isnotnull(A#10)] : : +- InMemoryRelation [A#10, B#11], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string> ,None) : : +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string> : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false])) : +- *(1) Filter isnotnull(A#34) : +- InMemoryTableScan [A#34, B#35], [isnotnull(A#34)] : +- InMemoryRelation [A#34, B#35], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string> ,None) : +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false])) +- *(1) Filter isnotnull(A#86) +- InMemoryTableScan [A#86, B#87], [isnotnull(A#86)] +- InMemoryRelation [A#86, B#87], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string> ,None) +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
previously this yielded:
== Physical Plan == InMemoryTableScan [A#10, B#11, B#37, B#89] +- InMemoryRelation [A#10, B#11, B#37, B#89], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) +- *(2) Project [A#10, B#11, B#37, B#89] +- *(2) BroadcastHashJoin [A#10], [A#88], Inner, BuildRight :- *(2) Filter isnotnull(A#10) : +- InMemoryTableScan [A#10, B#11, B#37], [isnotnull(A#10)] : +- InMemoryRelation [A#10, B#11, B#37], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) : +- *(2) Project [A#10, B#11, B#37] : +- *(2) BroadcastHashJoin [A#10], [A#36], Inner, BuildRight : :- *(2) Filter isnotnull(A#10) : : +- InMemoryTableScan [A#10, B#11], [isnotnull(A#10)] : : +- InMemoryRelation [A#10, B#11], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) : : +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string> : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false])) : +- *(1) Filter isnotnull(A#36) : +- InMemoryTableScan [A#36, B#37], [isnotnull(A#36)] : +- InMemoryRelation [A#36, B#37], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) : +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false])) +- *(1) Filter isnotnull(A#88) +- InMemoryTableScan [A#88, B#89], [isnotnull(A#88)] +- InMemoryRelation [A#88, B#89], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
This exponential growth can OOM the driver on large query plans with cached datasets.