Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-17593

list files on s3 very slow

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • 2.0.0
    • None
    • None
    • None
    • spark 2.0.0, hadoop 2.7.2 ( hadoop 2.7.3)

    Description

      lets say we have following partitioned data:

      events_v3
      -- event_date=2015-01-01
      ---- event_hour=0
      ------ verb=follow
      --------part10000.parquet.gz 
      ---- event_hour=1
      ------ verb=click
      --------part10000.parquet.gz 
      -- event_date=2015-01-02
      ---- event_hour=5
      ------ verb=follow
      --------part10000.parquet.gz 
      ---- event_hour=10
      ------ verb=click
      --------part10000.parquet.gz 
      

      To read (or write ) parquet partitioned data via spark it makes call to `ListingFileCatalog.listLeafFiles` . Which recursively tries to list all files and folders.

      In this case if we had 300 dates, we would have created 300 jobs each trying to get filelist from date_directory. This process takes about 10 minutes to finish ( with 2 executors). vs if I use a ruby script to get list of all files recursively in the same folder it takes about 1 minute, on the same machine with just 1 thread.

      I am confused as to why it would take so much time extra for listing files.
      spark code:

      val sparkSession = org.apache.spark.sql.SparkSession.builder
      .config("spark.sql.hive.metastorePartitionPruning",true)
      .config("spark.sql.parquet.filterPushdown", true)
      .config("spark.sql.hive.verifyPartitionPath", false)
      .config("spark.sql.hive.convertMetastoreParquet.mergeSchema",false)
      .config("parquet.enable.summary-metadata",false)
      .config("spark.sql.sources.partitionDiscovery.enabled",false)
      
      .getOrCreate()
      val df = sparkSession.read.option("mergeSchema","false").format("parquet").load("s3n://bucket_name/events_v3")
          df.createOrReplaceTempView("temp_events")
          sparkSession.sql(
            """
              |select verb,count(*) from temp_events where event_date = "2016-08-05" group by verb
            """.stripMargin).show()
      

      ruby code:

      gem 'aws-sdk', '~> 2'
      require 'aws-sdk'
      client = Aws::S3::Client.new(:region=>'us-west-1')
      next_continuation_token = nil
      total = 0
      loop do
      a= client.list_objects_v2({
        bucket: "bucket", # required
        max_keys: 1000,
        prefix: "events_v3/",
        continuation_token: next_continuation_token ,
        fetch_owner: false,
      })
      puts a.contents.last.key
      total += a.contents.size
      next_continuation_token = a.next_continuation_token
      break unless a.is_truncated
      end
      
      puts "total"
      puts total
      

      tried looking into following bug:
      https://issues.apache.org/jira/browse/HADOOP-12810
      but hadoop 2.7.3 doesn't solve that problem
      stackoverflow reference:
      http://stackoverflow.com/questions/39525288/spark-parquet-write-gets-slow-as-partitions-grow

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              gaurav24 Gaurav Shah
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: