Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-25145

Buffer size too small on spark.sql query with filterPushdown predicate=True

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Cannot Reproduce
    • 2.3.3
    • None
    • SQL
    • None

    Description

      java.lang.IllegalArgumentException: Buffer size too small. size = 262144 needed = 2205991

      1. Python
        import numpy as np
        import pandas as pd
        
        # Create a spark dataframe
        df = pd.DataFrame({'a': np.arange(10), 'b': np.arange(10) / 2.0})
        sdf = spark.createDataFrame(df)
        
        print('Created spark dataframe:')
        sdf.show()
        
        # Save table as orc
        sdf.write.saveAsTable(format='orc', mode='overwrite', name='bjornj.spark_buffer_size_too_small_on_filter_pushdown', compression='zlib')
        
        # Ensure filterPushdown is enabled
        spark.conf.set('spark.sql.orc.filterPushdown', True)
        
        # Fetch entire table (works)
        print('Read entire table with "filterPushdown"=True')
        spark.sql('SELECT * FROM bjornj.spark_buffer_size_too_small_on_filter_pushdown').show()
        
        # Ensure filterPushdown is disabled
        spark.conf.set('spark.sql.orc.filterPushdown', False)
        
        # Query without filterPushdown (works)
        print('Read a selection from table with "filterPushdown"=False')
        spark.sql('SELECT * FROM bjornj.spark_buffer_size_too_small_on_filter_pushdown WHERE a > 5').show()
        
        # Ensure filterPushdown is enabled
        spark.conf.set('spark.sql.orc.filterPushdown', True)
        
        # Query with filterPushDown (fails)
        print('Read a selection from table with "filterPushdown"=True')
        spark.sql('SELECT * FROM bjornj.spark_buffer_size_too_small_on_filter_pushdown WHERE a > 5').show()
        
      ~/bug_report $ pyspark
      Setting default log level to "WARN".
      To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
      2018-08-17 13:44:31,365 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
      Jupyter console 5.1.0
      
      Python 3.6.3 |Intel Corporation| (default, May 4 2018, 04:22:28)
      Type 'copyright', 'credits' or 'license' for more information
      IPython 6.3.1 -- An enhanced Interactive Python. Type '?' for help.
      
      
      
      In [1]: %run -i create_bug.py
      Welcome to
      ____ __
      / __/__ ___ _____/ /__
      _\ \/ _ \/ _ `/ __/ '_/
      /__ / .__/\_,_/_/ /_/\_\ version 2.3.3-SNAPSHOT
      /_/
      
      Using Python version 3.6.3 (default, May 4 2018 04:22:28)
      SparkSession available as 'spark'.
      Created spark dataframe:
      +---+---+
      | a| b|
      +---+---+
      | 0|0.0|
      | 1|0.5|
      | 2|1.0|
      | 3|1.5|
      | 4|2.0|
      | 5|2.5|
      | 6|3.0|
      | 7|3.5|
      | 8|4.0|
      | 9|4.5|
      +---+---+
      
      Read entire table with "filterPushdown"=True
      +---+---+
      | a| b|
      +---+---+
      | 1|0.5|
      | 2|1.0|
      | 3|1.5|
      | 5|2.5|
      | 6|3.0|
      | 7|3.5|
      | 8|4.0|
      | 9|4.5|
      | 4|2.0|
      | 0|0.0|
      +---+---+
      
      Read a selection from table with "filterPushdown"=False
      +---+---+
      | a| b|
      +---+---+
      | 6|3.0|
      | 7|3.5|
      | 8|4.0|
      | 9|4.5|
      +---+---+
      
      Read a selection from table with "filterPushdown"=True
      2018-08-17 13:44:48,685 ERROR Executor: Exception in task 0.0 in stage 10.0 (TID 40)
      java.lang.IllegalArgumentException: Buffer size too small. size = 262144 needed = 2205991
      at org.apache.orc.impl.InStream$CompressedStream.readHeader(InStream.java:212)
      at org.apache.orc.impl.InStream$CompressedStream.ensureUncompressed(InStream.java:263)
      at org.apache.orc.impl.InStream$CompressedStream.read(InStream.java:250)
      at java.io.InputStream.read(InputStream.java:101)
      at com.google.protobuf25.CodedInputStream.refillBuffer(CodedInputStream.java:737)
      at com.google.protobuf25.CodedInputStream.isAtEnd(CodedInputStream.java:701)
      at com.google.protobuf25.CodedInputStream.readTag(CodedInputStream.java:99)
      at org.apache.orc.OrcProto$RowIndex.<init>(OrcProto.java:7609)
      at org.apache.orc.OrcProto$RowIndex.<init>(OrcProto.java:7573)
      at org.apache.orc.OrcProto$RowIndex$1.parsePartialFrom(OrcProto.java:7662)
      at org.apache.orc.OrcProto$RowIndex$1.parsePartialFrom(OrcProto.java:7657)
      at com.google.protobuf25.AbstractParser.parseFrom(AbstractParser.java:89)
      at com.google.protobuf25.AbstractParser.parseFrom(AbstractParser.java:95)
      at com.google.protobuf25.AbstractParser.parseFrom(AbstractParser.java:49)
      at org.apache.orc.OrcProto$RowIndex.parseFrom(OrcProto.java:7794)
      at org.apache.orc.impl.RecordReaderUtils$DefaultDataReader.readRowIndex(RecordReaderUtils.java:231)
      at org.apache.orc.impl.RecordReaderImpl.readRowIndex(RecordReaderImpl.java:1281)
      at org.apache.orc.impl.RecordReaderImpl.readRowIndex(RecordReaderImpl.java:1264)
      at org.apache.orc.impl.RecordReaderImpl.pickRowGroups(RecordReaderImpl.java:918)
      at org.apache.orc.impl.RecordReaderImpl.readStripe(RecordReaderImpl.java:949)
      at org.apache.orc.impl.RecordReaderImpl.advanceStripe(RecordReaderImpl.java:1116)
      at org.apache.orc.impl.RecordReaderImpl.advanceToNextRow(RecordReaderImpl.java:1151)
      at org.apache.orc.impl.RecordReaderImpl.<init>(RecordReaderImpl.java:271)
      at org.apache.orc.impl.ReaderImpl.rows(ReaderImpl.java:627)
      at org.apache.spark.sql.execution.datasources.orc.OrcColumnarBatchReader.initialize(OrcColumnarBatchReader.java:138)
      at org.apache.spark.sql.execution.datasources.orc.OrcFileFormat$$anonfun$buildReaderWithPartitionValues$2.apply(OrcFileFormat.scala:196)
      at org.apache.spark.sql.execution.datasources.orc.OrcFileFormat$$anonfun$buildReaderWithPartitionValues$2.apply(OrcFileFormat.scala:160)
      at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:128)
      at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:182)
      at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
      at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.scan_nextBatch_0$(Unknown Source)
      at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
      at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
      at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
      at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
      at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
      at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
      at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
      at org.apache.spark.scheduler.Task.run(Task.scala:109)
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      at java.lang.Thread.run(Thread.java:748)
      2018-08-17 13:44:48,708 WARN TaskSetManager: Lost task 0.0 in stage 10.0 (TID 40, localhost, executor driver): java.lang.IllegalArgumentException: Buffer size too small. size = 262144 needed = 2205991
      
      
      

      Meta data for test table (orc-tools/orc-metadata):

      { "name": "/apps/hive/warehouse/spark_buffer_size_too_small_on_filter_pushdown/part-00000-358856bc-f771-43d1-bd83-024a288df787-c000.zlib.orc",
      "type": "struct<a:bigint,b:double>",
      "rows": 1,
      "stripe count": 1,
      "format": "0.12", "writer version": "ORC-135",
      "compression": "zlib", "compression block": 262144,
      "file length": 269,
      "content": 121, "stripe stats": 42, "footer": 82, "postscript": 23,
      "row index stride": 10000,
      "user metadata": {
      },
      "stripes": [
      { "stripe": 0, "rows": 1,
      "offset": 3, "length": 118,
      "index": 63, "data": 14, "footer": 41
      }
      ]
      }
      

      Workaround: set spark.sql.orc.filterPushdown = false

       

       

      Attachments

        1. report.txt
          24 kB
          Bjørnar Jensen
        2. create_bug.py
          1 kB
          Bjørnar Jensen

        Issue Links

          Activity

            People

              Unassigned Unassigned
              bjensen Bjørnar Jensen
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: