Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-21074

Parquet files are read fully even though only count() is requested

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Cannot Reproduce
    • 2.1.0
    • None
    • Optimizer, SQL
    • None

    Description

      I have the following sample code that creates parquet files:

      val spark = SparkSession.builder()
            .config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")
            .config("spark.hadoop.parquet.metadata.read.parallelism", "50")
            .appName("Test Write").getOrCreate()
      
      val sqc = spark.sqlContext
      import sqc.implicits._
      
      val random = new scala.util.Random(31L)
      (1465720077 to 1465720077+10000000).map(x => Event(x, random.nextString(2)))
        .toDS()
        .write
        .mode(SaveMode.Overwrite)
        .parquet("s3://my-bucket/test")
      

      Afterwards, I'm trying to read these files with the following code:

      val spark = SparkSession.builder()
            .config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")
            .config("spark.hadoop.parquet.metadata.read.parallelism", "50")
            .config("spark.sql.parquet.filterPushdown", "true")
            .appName("Test Read").getOrCreate()
      
      spark.sqlContext.read
            .option("mergeSchema", "false")
            .parquet("s3://my-bucket/test")
            .count()
      

      I've enabled DEBUG log level to see what requests are actually sent through S3 API, and I've figured out that in addition to parquet "footer" retrieval there are requests that ask for whole file content.

      For example, this is full content request example:

      17/06/13 05:46:50 DEBUG wire: http-outgoing-1 >> "GET /test/part-00000-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet HTTP/1.1[\r][\n]"
      ....
      17/06/13 05:46:50 DEBUG wire: http-outgoing-1 << "Content-Range: bytes 0-7472093/7472094[\r][\n]"
      ....
      17/06/13 05:46:50 DEBUG wire: http-outgoing-1 << "Content-Length: 7472094[\r][\n]"
      

      And this is partial request example for footer only:

      17/06/13 05:46:50 DEBUG headers: http-outgoing-2 >> GET /test/part-00000-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet HTTP/1.1
      ....
      17/06/13 05:46:50 DEBUG headers: http-outgoing-2 >> Range: bytes=7472086-7472094
      ...
      17/06/13 05:46:50 DEBUG wire: http-outgoing-2 << "Content-Length: 8[\r][\n]"
      ....
      

      Here's what FileScanRDD prints:

      17/06/13 05:46:52 INFO FileScanRDD: Reading File path: s3://my-bucket/test/part-00004-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet, range: 0-7473020, partition values: [empty row]
      17/06/13 05:46:52 INFO FileScanRDD: Reading File path: s3://my-bucket/test/part-00011-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet, range: 0-7472503, partition values: [empty row]
      17/06/13 05:46:52 INFO FileScanRDD: Reading File path: s3://my-bucket/test/part-00006-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet, range: 0-7472501, partition values: [empty row]
      17/06/13 05:46:52 INFO FileScanRDD: Reading File path: s3://my-bucket/test/part-00007-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet, range: 0-7473104, partition values: [empty row]
      17/06/13 05:46:52 INFO FileScanRDD: Reading File path: s3://my-bucket/test/part-00003-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet, range: 0-7472458, partition values: [empty row]
      17/06/13 05:46:52 INFO FileScanRDD: Reading File path: s3://my-bucket/test/part-00012-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet, range: 0-7472594, partition values: [empty row]
      17/06/13 05:46:52 INFO FileScanRDD: Reading File path: s3://my-bucket/test/part-00001-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet, range: 0-7472984, partition values: [empty row]
      17/06/13 05:46:52 INFO FileScanRDD: Reading File path: s3://my-bucket/test/part-00014-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet, range: 0-7472720, partition values: [empty row]
      17/06/13 05:46:53 INFO FileScanRDD: Reading File path: s3://my-bucket/test/part-00008-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet, range: 0-7472339, partition values: [empty row]
      17/06/13 05:46:53 INFO FileScanRDD: Reading File path: s3://my-bucket/test/part-00015-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet, range: 0-7472437, partition values: [empty row]
      17/06/13 05:46:53 INFO FileScanRDD: Reading File path: s3://my-bucket/test/part-00013-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet, range: 0-7472312, partition values: [empty row]
      17/06/13 05:46:53 INFO FileScanRDD: Reading File path: s3://my-bucket/test/part-00002-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet, range: 0-7472191, partition values: [empty row]
      17/06/13 05:46:53 INFO FileScanRDD: Reading File path: s3://my-bucket/test/part-00005-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet, range: 0-7472239, partition values: [empty row]
      17/06/13 05:46:53 INFO FileScanRDD: Reading File path: s3://my-bucket/test/part-00000-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet, range: 0-7472094, partition values: [empty row]
      17/06/13 05:46:53 INFO FileScanRDD: Reading File path: s3://my-bucket/test/part-00010-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet, range: 0-7471960, partition values: [empty row]
      17/06/13 05:46:53 INFO FileScanRDD: Reading File path: s3://my-bucket/test/part-00009-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet, range: 0-7471520, partition values: [empty row]
      

      Parquet tool info (on one of the files):

      ~$ parquet-tools meta part-00009-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet
      file:        file:/Users/michael/part-00009-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet
      creator:     parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf)
      extra:       org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields":[{"name":"time","type":"long","nullable":false,"metadata":{}},{"name":"country","type":"string","nullable":true,"metadata":{}}]}
      
      file schema: spark_schema
      --------------------------------------------------------------------------------
      time:        REQUIRED INT64 R:0 D:0
      country:     OPTIONAL BINARY O:UTF8 R:0 D:1
      
      row group 1: RC:625000 TS:11201229 OFFSET:4
      --------------------------------------------------------------------------------
      time:         INT64 SNAPPY DO:0 FPO:4 SZ:2501723/5000239/2.00 VC:625000 ENC:PLAIN,BIT_PACKED
      country:      BINARY SNAPPY DO:0 FPO:2501727 SZ:4969317/6200990/1.25 VC:625000 ENC:PLAIN,BIT_PACKED,RLE
      

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            Unassigned Unassigned
            spektom Michael Spector
            Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment