Details
Description
Since upgrading to 1.5.1, using the CACHE TABLE works great for all tables except for parquet tables, likely related to the parquet native reader.
Here are steps for parquet table:
create table test_parquet stored as parquet as select 1; explain select * from test_parquet;
With output:
== Physical Plan ==
Scan ParquetRelation[hdfs://192.168.99.9/user/hive/warehouse/test_parquet][_c0#141]
And then caching:
cache table test_parquet; explain select * from test_parquet;
With output:
== Physical Plan ==
Scan ParquetRelation[hdfs://192.168.99.9/user/hive/warehouse/test_parquet][_c0#174]
Note it isn't cached. I have included spark log output for the cache table and explain statements below.
—
Here's the same for non-parquet table:
cache table test_no_parquet; explain select * from test_no_parquet;
With output:
== Physical Plan ==
HiveTableScan [_c0#210], (MetastoreRelation default, test_no_parquet, None)
And then caching:
cache table test_no_parquet; explain select * from test_no_parquet;
With output:
== Physical Plan == InMemoryColumnarTableScan [_c0#229], (InMemoryRelation [_c0#229], true, 10000, StorageLevel(true, true, false, true, 1), (HiveTableScan [_c0#211], (MetastoreRelation default, test_no_parquet, None)), Some(test_no_parquet))
Not that the table seems to be cached.
—
Note that if the flag spark.sql.hive.convertMetastoreParquet is set to false, parquet tables work the same as non-parquet tables with caching. This is a reasonable workaround for us, but ideally, we would like to benefit from the native reading.
—
Spark logs for cache table for test_parquet:
15/10/21 21:22:05 INFO thriftserver.SparkExecuteStatementOperation: Running query 'cache table test_parquet' with 20ee2ab9-5242-4783-81cf-46115ed72610 15/10/21 21:22:05 INFO metastore.HiveMetaStore: 49: get_table : db=default tbl=test_parquet 15/10/21 21:22:05 INFO HiveMetaStore.audit: ugi=vagrant ip=unknown-ip-addr cmd=get_table : db=default tbl=test_parquet 15/10/21 21:22:05 INFO metastore.HiveMetaStore: 49: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore 15/10/21 21:22:05 INFO metastore.ObjectStore: ObjectStore, initialize called 15/10/21 21:22:05 INFO DataNucleus.Query: Reading in results for query "org.datanucleus.store.rdbms.query.SQLQuery@0" since the connection used is closing 15/10/21 21:22:05 INFO metastore.MetaStoreDirectSql: Using direct SQL, underlying DB is MYSQL 15/10/21 21:22:05 INFO metastore.ObjectStore: Initialized ObjectStore 15/10/21 21:22:05 INFO storage.MemoryStore: ensureFreeSpace(215680) called with curMem=4196713, maxMem=139009720 15/10/21 21:22:05 INFO storage.MemoryStore: Block broadcast_59 stored as values in memory (estimated size 210.6 KB, free 128.4 MB) 15/10/21 21:22:05 INFO storage.MemoryStore: ensureFreeSpace(20265) called with curMem=4412393, maxMem=139009720 15/10/21 21:22:05 INFO storage.MemoryStore: Block broadcast_59_piece0 stored as bytes in memory (estimated size 19.8 KB, free 128.3 MB) 15/10/21 21:22:05 INFO storage.BlockManagerInfo: Added broadcast_59_piece0 in memory on 192.168.99.9:50262 (size: 19.8 KB, free: 132.2 MB) 15/10/21 21:22:05 INFO spark.SparkContext: Created broadcast 59 from run at AccessController.java:-2 15/10/21 21:22:05 INFO metastore.HiveMetaStore: 49: get_table : db=default tbl=test_parquet 15/10/21 21:22:05 INFO HiveMetaStore.audit: ugi=vagrant ip=unknown-ip-addr cmd=get_table : db=default tbl=test_parquet 15/10/21 21:22:05 INFO storage.MemoryStore: ensureFreeSpace(215680) called with curMem=4432658, maxMem=139009720 15/10/21 21:22:05 INFO storage.MemoryStore: Block broadcast_60 stored as values in memory (estimated size 210.6 KB, free 128.1 MB) 15/10/21 21:22:05 INFO storage.BlockManagerInfo: Removed broadcast_58_piece0 on 192.168.99.9:50262 in memory (size: 19.8 KB, free: 132.2 MB) 15/10/21 21:22:05 INFO storage.BlockManagerInfo: Removed broadcast_57_piece0 on 192.168.99.9:50262 in memory (size: 21.1 KB, free: 132.2 MB) 15/10/21 21:22:05 INFO storage.BlockManagerInfo: Removed broadcast_57_piece0 on slave2:46912 in memory (size: 21.1 KB, free: 534.5 MB) 15/10/21 21:22:05 INFO storage.BlockManagerInfo: Removed broadcast_57_piece0 on slave0:46599 in memory (size: 21.1 KB, free: 534.3 MB) 15/10/21 21:22:05 INFO spark.ContextCleaner: Cleaned accumulator 86 15/10/21 21:22:05 INFO spark.ContextCleaner: Cleaned accumulator 84 15/10/21 21:22:05 INFO storage.MemoryStore: ensureFreeSpace(20265) called with curMem=4327620, maxMem=139009720 15/10/21 21:22:05 INFO storage.MemoryStore: Block broadcast_60_piece0 stored as bytes in memory (estimated size 19.8 KB, free 128.4 MB) 15/10/21 21:22:05 INFO storage.BlockManagerInfo: Added broadcast_60_piece0 in memory on 192.168.99.9:50262 (size: 19.8 KB, free: 132.2 MB) 15/10/21 21:22:05 INFO spark.SparkContext: Created broadcast 60 from run at AccessController.java:-2 15/10/21 21:22:05 INFO spark.SparkContext: Starting job: run at AccessController.java:-2 15/10/21 21:22:05 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://192.168.99.9/user/hive/warehouse/test_parquet/part-r-00000-7cf64eb9-76ca-47c7-92aa-eb5ba879faae.gz.parquet 15/10/21 21:22:05 INFO scheduler.DAGScheduler: Registering RDD 171 (run at AccessController.java:-2) 15/10/21 21:22:05 INFO scheduler.DAGScheduler: Got job 24 (run at AccessController.java:-2) with 1 output partitions 15/10/21 21:22:05 INFO scheduler.DAGScheduler: Final stage: ResultStage 34(run at AccessController.java:-2) 15/10/21 21:22:05 INFO scheduler.DAGScheduler: Parents of final stage: List(ShuffleMapStage 33) 15/10/21 21:22:05 INFO scheduler.DAGScheduler: Missing parents: List(ShuffleMapStage 33) 15/10/21 21:22:05 INFO scheduler.DAGScheduler: Submitting ShuffleMapStage 33 (MapPartitionsRDD[171] at run at AccessController.java:-2), which has no missing parents 15/10/21 21:22:05 INFO storage.MemoryStore: ensureFreeSpace(9472) called with curMem=4347885, maxMem=139009720 15/10/21 21:22:05 INFO storage.MemoryStore: Block broadcast_61 stored as values in memory (estimated size 9.3 KB, free 128.4 MB) 15/10/21 21:22:05 INFO storage.MemoryStore: ensureFreeSpace(4838) called with curMem=4357357, maxMem=139009720 15/10/21 21:22:05 INFO storage.MemoryStore: Block broadcast_61_piece0 stored as bytes in memory (estimated size 4.7 KB, free 128.4 MB) 15/10/21 21:22:05 INFO storage.BlockManagerInfo: Added broadcast_61_piece0 in memory on 192.168.99.9:50262 (size: 4.7 KB, free: 132.2 MB) 15/10/21 21:22:05 INFO spark.SparkContext: Created broadcast 61 from broadcast at DAGScheduler.scala:861 15/10/21 21:22:05 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 33 (MapPartitionsRDD[171] at run at AccessController.java:-2) 15/10/21 21:22:05 INFO cluster.YarnScheduler: Adding task set 33.0 with 1 tasks 15/10/21 21:22:05 INFO scheduler.FairSchedulableBuilder: Added task set TaskSet_33 tasks to pool default 15/10/21 21:22:05 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 33.0 (TID 45, slave2, NODE_LOCAL, 2234 bytes) 15/10/21 21:22:05 INFO storage.BlockManagerInfo: Added broadcast_61_piece0 in memory on slave2:46912 (size: 4.7 KB, free: 534.5 MB) 15/10/21 21:22:05 INFO storage.BlockManagerInfo: Added broadcast_60_piece0 in memory on slave2:46912 (size: 19.8 KB, free: 534.4 MB) 15/10/21 21:22:05 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 33.0 (TID 45) in 105 ms on slave2 (1/1) 15/10/21 21:22:05 INFO cluster.YarnScheduler: Removed TaskSet 33.0, whose tasks have all completed, from pool default 15/10/21 21:22:05 INFO scheduler.DAGScheduler: ShuffleMapStage 33 (run at AccessController.java:-2) finished in 0.105 s 15/10/21 21:22:05 INFO scheduler.DAGScheduler: looking for newly runnable stages 15/10/21 21:22:05 INFO scheduler.DAGScheduler: running: Set() 15/10/21 21:22:05 INFO scheduler.DAGScheduler: waiting: Set(ResultStage 34) 15/10/21 21:22:05 INFO scheduler.DAGScheduler: failed: Set() 15/10/21 21:22:05 INFO scheduler.DAGScheduler: Missing parents for ResultStage 34: List() 15/10/21 21:22:05 INFO scheduler.StatsReportListener: Finished stage: org.apache.spark.scheduler.StageInfo@532f49c8 15/10/21 21:22:05 INFO scheduler.DAGScheduler: Submitting ResultStage 34 (MapPartitionsRDD[174] at run at AccessController.java:-2), which is now runnable 15/10/21 21:22:05 INFO scheduler.StatsReportListener: task runtime:(count: 1, mean: 105.000000, stdev: 0.000000, max: 105.000000, min: 105.000000) 15/10/21 21:22:05 INFO scheduler.StatsReportListener: 0% 5% 10% 25% 50% 75% 90% 95% 100% 15/10/21 21:22:05 INFO scheduler.StatsReportListener: 105.0 ms 105.0 ms 105.0 ms 105.0 ms 105.0 ms 105.0 ms 105.0 ms 105.0 ms 105.0 ms 15/10/21 21:22:05 INFO scheduler.StatsReportListener: shuffle bytes written:(count: 1, mean: 49.000000, stdev: 0.000000, max: 49.000000, min: 49.000000) 15/10/21 21:22:05 INFO scheduler.StatsReportListener: 0% 5% 10% 25% 50% 75% 90% 95% 100% 15/10/21 21:22:05 INFO scheduler.StatsReportListener: 49.0 B 49.0 B 49.0 B 49.0 B 49.0 B 49.0 B 49.0 B 49.0 B 49.0 B 15/10/21 21:22:05 INFO storage.MemoryStore: ensureFreeSpace(10440) called with curMem=4362195, maxMem=139009720 15/10/21 21:22:05 INFO scheduler.StatsReportListener: task result size:(count: 1, mean: 2381.000000, stdev: 0.000000, max: 2381.000000, min: 2381.000000) 15/10/21 21:22:05 INFO scheduler.StatsReportListener: 0% 5% 10% 25% 50% 75% 90% 95% 100% 15/10/21 21:22:05 INFO scheduler.StatsReportListener: 2.3 KB 2.3 KB 2.3 KB 2.3 KB 2.3 KB 2.3 KB 2.3 KB 2.3 KB 2.3 KB 15/10/21 21:22:05 INFO storage.MemoryStore: Block broadcast_62 stored as values in memory (estimated size 10.2 KB, free 128.4 MB) 15/10/21 21:22:05 INFO scheduler.StatsReportListener: executor (non-fetch) time pct: (count: 1, mean: 68.571429, stdev: 0.000000, max: 68.571429, min: 68.571429) 15/10/21 21:22:05 INFO scheduler.StatsReportListener: 0% 5% 10% 25% 50% 75% 90% 95% 100% 15/10/21 21:22:05 INFO scheduler.StatsReportListener: 69 % 69 % 69 % 69 % 69 % 69 % 69 % 69 % 69 % 15/10/21 21:22:05 INFO scheduler.StatsReportListener: other time pct: (count: 1, mean: 31.428571, stdev: 0.000000, max: 31.428571, min: 31.428571) 15/10/21 21:22:05 INFO scheduler.StatsReportListener: 0% 5% 10% 25% 50% 75% 90% 95% 100% 15/10/21 21:22:05 INFO scheduler.StatsReportListener: 31 % 31 % 31 % 31 % 31 % 31 % 31 % 31 % 31 % 15/10/21 21:22:05 INFO storage.MemoryStore: ensureFreeSpace(5358) called with curMem=4372635, maxMem=139009720 15/10/21 21:22:05 INFO storage.MemoryStore: Block broadcast_62_piece0 stored as bytes in memory (estimated size 5.2 KB, free 128.4 MB) 15/10/21 21:22:05 INFO storage.BlockManagerInfo: Added broadcast_62_piece0 in memory on 192.168.99.9:50262 (size: 5.2 KB, free: 132.2 MB) 15/10/21 21:22:05 INFO spark.SparkContext: Created broadcast 62 from broadcast at DAGScheduler.scala:861 15/10/21 21:22:05 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 34 (MapPartitionsRDD[174] at run at AccessController.java:-2) 15/10/21 21:22:05 INFO cluster.YarnScheduler: Adding task set 34.0 with 1 tasks 15/10/21 21:22:05 INFO scheduler.FairSchedulableBuilder: Added task set TaskSet_34 tasks to pool default 15/10/21 21:22:05 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 34.0 (TID 46, slave2, PROCESS_LOCAL, 1914 bytes) 15/10/21 21:22:05 INFO storage.BlockManagerInfo: Added broadcast_62_piece0 in memory on slave2:46912 (size: 5.2 KB, free: 534.4 MB) 15/10/21 21:22:05 INFO spark.MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 9 to slave2:43867 15/10/21 21:22:05 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 9 is 135 bytes 15/10/21 21:22:05 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 34.0 (TID 46) in 48 ms on slave2 (1/1) 15/10/21 21:22:05 INFO cluster.YarnScheduler: Removed TaskSet 34.0, whose tasks have all completed, from pool default 15/10/21 21:22:05 INFO scheduler.DAGScheduler: ResultStage 34 (run at AccessController.java:-2) finished in 0.047 s 15/10/21 21:22:05 INFO scheduler.StatsReportListener: Finished stage: org.apache.spark.scheduler.StageInfo@37a20848 15/10/21 21:22:05 INFO scheduler.StatsReportListener: task runtime:(count: 1, mean: 48.000000, stdev: 0.000000, max: 48.000000, min: 48.000000) 15/10/21 21:22:05 INFO scheduler.StatsReportListener: 0% 5% 10% 25% 50% 75% 90% 95% 100% 15/10/21 21:22:05 INFO scheduler.StatsReportListener: 48.0 ms 48.0 ms 48.0 ms 48.0 ms 48.0 ms 48.0 ms 48.0 ms 48.0 ms 48.0 ms 15/10/21 21:22:05 INFO scheduler.StatsReportListener: fetch wait time:(count: 1, mean: 0.000000, stdev: 0.000000, max: 0.000000, min: 0.000000) 15/10/21 21:22:05 INFO scheduler.StatsReportListener: 0% 5% 10% 25% 50% 75% 90% 95% 100% 15/10/21 21:22:05 INFO scheduler.StatsReportListener: 0.0 ms 0.0 ms 0.0 ms 0.0 ms 0.0 ms 0.0 ms 0.0 ms 0.0 ms 0.0 ms 15/10/21 21:22:05 INFO scheduler.StatsReportListener: remote bytes read:(count: 1, mean: 0.000000, stdev: 0.000000, max: 0.000000, min: 0.000000) 15/10/21 21:22:05 INFO scheduler.StatsReportListener: 0% 5% 10% 25% 50% 75% 90% 95% 100% 15/10/21 21:22:05 INFO scheduler.StatsReportListener: 0.0 B 0.0 B 0.0 B 0.0 B 0.0 B 0.0 B 0.0 B 0.0 B 0.0 B 15/10/21 21:22:05 INFO scheduler.StatsReportListener: task result size:(count: 1, mean: 1737.000000, stdev: 0.000000, max: 1737.000000, min: 1737.000000) 15/10/21 21:22:05 INFO scheduler.StatsReportListener: 0% 5% 10% 25% 50% 75% 90% 95% 100% 15/10/21 21:22:05 INFO scheduler.StatsReportListener: 1737.0 B 1737.0 B 1737.0 B 1737.0 B 1737.0 B 1737.0 B 1737.0 B 1737.0 B 1737.0 B 15/10/21 21:22:05 INFO scheduler.StatsReportListener: executor (non-fetch) time pct: (count: 1, mean: 29.166667, stdev: 0.000000, max: 29.166667, min: 29.166667) 15/10/21 21:22:05 INFO scheduler.StatsReportListener: 0% 5% 10% 25% 50% 75% 90% 95% 100% 15/10/21 21:22:05 INFO scheduler.StatsReportListener: 29 % 29 % 29 % 29 % 29 % 29 % 29 % 29 % 29 % 15/10/21 21:22:05 INFO scheduler.StatsReportListener: fetch wait time pct: (count: 1, mean: 0.000000, stdev: 0.000000, max: 0.000000, min: 0.000000) 15/10/21 21:22:05 INFO scheduler.StatsReportListener: 0% 5% 10% 25% 50% 75% 90% 95% 100% 15/10/21 21:22:05 INFO scheduler.StatsReportListener: 0 % 0 % 0 % 0 % 0 % 0 % 0 % 0 % 0 % 15/10/21 21:22:05 INFO scheduler.StatsReportListener: other time pct: (count: 1, mean: 70.833333, stdev: 0.000000, max: 70.833333, min: 70.833333) 15/10/21 21:22:05 INFO scheduler.StatsReportListener: 0% 5% 10% 25% 50% 75% 90% 95% 100% 15/10/21 21:22:05 INFO scheduler.StatsReportListener: 71 % 71 % 71 % 71 % 71 % 71 % 71 % 71 % 71 % 15/10/21 21:22:05 INFO scheduler.DAGScheduler: Job 24 finished: run at AccessController.java:-2, took 0.175295 s
Spark logs for explain for test_parquet:
15/10/21 21:23:19 INFO thriftserver.SparkExecuteStatementOperation: Running query 'explain select * from test_parquet' with bae9c0bf-57f9-4c80-b745-3f0202469f3f 15/10/21 21:23:19 INFO parse.ParseDriver: Parsing command: explain select * from test_parquet 15/10/21 21:23:19 INFO parse.ParseDriver: Parse Completed 15/10/21 21:23:19 INFO metastore.HiveMetaStore: 50: get_table : db=default tbl=test_parquet 15/10/21 21:23:19 INFO HiveMetaStore.audit: ugi=vagrant ip=unknown-ip-addr cmd=get_table : db=default tbl=test_parquet 15/10/21 21:23:19 INFO metastore.HiveMetaStore: 50: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore 15/10/21 21:23:19 INFO metastore.ObjectStore: ObjectStore, initialize called 15/10/21 21:23:19 INFO DataNucleus.Query: Reading in results for query "org.datanucleus.store.rdbms.query.SQLQuery@0" since the connection used is closing 15/10/21 21:23:19 INFO metastore.MetaStoreDirectSql: Using direct SQL, underlying DB is MYSQL 15/10/21 21:23:19 INFO metastore.ObjectStore: Initialized ObjectStore 15/10/21 21:23:19 INFO storage.MemoryStore: ensureFreeSpace(215680) called with curMem=4377993, maxMem=139009720 15/10/21 21:23:19 INFO storage.MemoryStore: Block broadcast_63 stored as values in memory (estimated size 210.6 KB, free 128.2 MB) 15/10/21 21:23:19 INFO storage.MemoryStore: ensureFreeSpace(20265) called with curMem=4593673, maxMem=139009720 15/10/21 21:23:19 INFO storage.MemoryStore: Block broadcast_63_piece0 stored as bytes in memory (estimated size 19.8 KB, free 128.2 MB) 15/10/21 21:23:19 INFO storage.BlockManagerInfo: Added broadcast_63_piece0 in memory on 192.168.99.9:50262 (size: 19.8 KB, free: 132.2 MB) 15/10/21 21:23:19 INFO spark.SparkContext: Created broadcast 63 from run at AccessController.java:-2 15/10/21 21:23:19 INFO thriftserver.SparkExecuteStatementOperation: Result Schema: List(plan#262) 15/10/21 21:23:19 INFO thriftserver.SparkExecuteStatementOperation: Result Schema: List(plan#262) 15/10/21 21:23:19 INFO thriftserver.SparkExecuteStatementOperation: Result Schema: List(plan#262)
Attachments
Issue Links
- is duplicated by
-
SPARK-12167 Invoke the right sameResult function when plan is warpped with SubQueries
- Closed
- links to