Details
-
Bug
-
Status: Resolved
-
Minor
-
Resolution: Fixed
-
2.0.0
-
None
-
None
-
None
-
spark 2.0.0, hadoop 2.7.2 ( hadoop 2.7.3)
Description
lets say we have following partitioned data:
events_v3 -- event_date=2015-01-01 ---- event_hour=0 ------ verb=follow --------part10000.parquet.gz ---- event_hour=1 ------ verb=click --------part10000.parquet.gz -- event_date=2015-01-02 ---- event_hour=5 ------ verb=follow --------part10000.parquet.gz ---- event_hour=10 ------ verb=click --------part10000.parquet.gz
To read (or write ) parquet partitioned data via spark it makes call to `ListingFileCatalog.listLeafFiles` . Which recursively tries to list all files and folders.
In this case if we had 300 dates, we would have created 300 jobs each trying to get filelist from date_directory. This process takes about 10 minutes to finish ( with 2 executors). vs if I use a ruby script to get list of all files recursively in the same folder it takes about 1 minute, on the same machine with just 1 thread.
I am confused as to why it would take so much time extra for listing files.
spark code:
val sparkSession = org.apache.spark.sql.SparkSession.builder .config("spark.sql.hive.metastorePartitionPruning",true) .config("spark.sql.parquet.filterPushdown", true) .config("spark.sql.hive.verifyPartitionPath", false) .config("spark.sql.hive.convertMetastoreParquet.mergeSchema",false) .config("parquet.enable.summary-metadata",false) .config("spark.sql.sources.partitionDiscovery.enabled",false) .getOrCreate() val df = sparkSession.read.option("mergeSchema","false").format("parquet").load("s3n://bucket_name/events_v3") df.createOrReplaceTempView("temp_events") sparkSession.sql( """ |select verb,count(*) from temp_events where event_date = "2016-08-05" group by verb """.stripMargin).show()
ruby code:
gem 'aws-sdk', '~> 2' require 'aws-sdk' client = Aws::S3::Client.new(:region=>'us-west-1') next_continuation_token = nil total = 0 loop do a= client.list_objects_v2({ bucket: "bucket", # required max_keys: 1000, prefix: "events_v3/", continuation_token: next_continuation_token , fetch_owner: false, }) puts a.contents.last.key total += a.contents.size next_continuation_token = a.next_continuation_token break unless a.is_truncated end puts "total" puts total
tried looking into following bug:
https://issues.apache.org/jira/browse/HADOOP-12810
but hadoop 2.7.3 doesn't solve that problem
stackoverflow reference:
http://stackoverflow.com/questions/39525288/spark-parquet-write-gets-slow-as-partitions-grow
Attachments
Issue Links
- depends upon
-
HADOOP-13208 S3A listFiles(recursive=true) to do a bulk listObjects instead of walking the pseudo-tree of directories
- Resolved
-
HADOOP-13345 S3Guard: Improved Consistency for S3A
- Resolved
- is related to
-
HADOOP-13208 S3A listFiles(recursive=true) to do a bulk listObjects instead of walking the pseudo-tree of directories
- Resolved
-
SPARK-24280 Speed up indexing of files in object stores by using listFiles(path, recursive=true)
- Resolved