Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-27239

Processing Compressed HDFS files with spark failing with error: "java.lang.IllegalArgumentException: requirement failed: length (-1) cannot be negative" from spark 2.2.X

Log workAgile BoardRank to TopRank to BottomAttach filesAttach ScreenshotBulk Copy AttachmentsBulk Move AttachmentsVotersWatch issueWatchersCreate sub-taskConvert to sub-taskMoveLinkCloneLabelsUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete CommentsDelete
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Duplicate
    • 2.2.1, 2.2.2, 2.2.3, 2.3.0, 2.3.1, 2.3.2, 2.3.3, 2.4.0
    • None
    • Spark Core
    • None

    Description

       

      From spark 2.2.x versions, when spark job processing any compressed HDFS files with custom input file format then spark jobs are failing with error "java.lang.IllegalArgumentException: requirement failed: length (-1) cannot be negative", the custom input file format will return the number of bytes length value as -1 for compressed file formats due to the compressed HDFS file are non splitable, so for compressed input file format the split will be offset as 0 and number of bytes length as -1, spark should consider the bytes length value -1 as valid split for the compressed file formats.

       

      We observed that earlier versions of spark doesn’t have this validation, and found that from spark 2.2.x new validation got introduced in the class InputFileBlockHolder, so spark should accept the number of bytes length value -1 as valid length for input splits from spark 2.2.x as well.

       

      Below is the stack trace.

       Caused by: java.lang.IllegalArgumentException: requirement failed: length (-1) cannot be negative

        at scala.Predef$.require(Predef.scala:224)

        at org.apache.spark.rdd.InputFileBlockHolder$.set(InputFileBlockHolder.scala:70)

        at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:226)

        at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:214)

        at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:94)

        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)

        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)

        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)

        at org.apache.spark.scheduler.Task.run(Task.scala:109)

        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)

        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

        at java.lang.Thread.run(Thread.java:748)

       

      Below is the code snippet which caused this issue.

         **    require(length >= 0, s"length ($length) cannot be negative") // This validation caused the issue. 

       

      // code placeholder
      
       org.apache.spark.rdd.InputFileBlockHolder - spark-core
      
       
      
      def set(filePath: String, startOffset: Long, length: Long): Unit = {
      
          require(filePath != null, "filePath cannot be null")
      
          require(startOffset >= 0, s"startOffset ($startOffset) cannot be negative")
      
          require(length >= 0, s"length ($length) cannot be negative")  
      
          inputBlock.set(new FileBlock(UTF8String.fromString(filePath), startOffset, length))
      
        }
      

       

      Steps to reproduce the issue.

       Please refer the below code to reproduce the issue.  

      // code placeholder
      
      import org.apache.hadoop.mapred.JobConf
      
      val hadoopConf = new JobConf()
      
      import org.apache.hadoop.mapred.FileInputFormat
      
      import org.apache.hadoop.fs.Path
      
      FileInputFormat.setInputPaths(hadoopConf, new Path("/output656/part-r-00000.gz"))    
      
      val records = sc.hadoopRDD(hadoopConf,classOf[com.platform.custom.storagehandler.INFAInputFormat], classOf[org.apache.hadoop.io.LongWritable], classOf[org.apache.hadoop.io.Writable]) 
      
      records.count()
      

       

      Attachments

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            Unassigned Unassigned Assign to me
            rkinthali Rajesh Kumar K
            Votes:
            1 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment