Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-6579

save as parquet with overwrite failed when linking with Hadoop 1.0.4

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Critical
    • Resolution: Won't Fix
    • 1.3.0, 1.4.0
    • None
    • SQL
    • None

    Description

          df = sc.parallelize(xrange(n), 4).map(lambda x: (x, str(x) * 2,)).toDF(['int', 'str'])
          df.save("test_data", source="parquet", mode='overwrite')
          df.save("test_data", source="parquet", mode='overwrite')
      

      it failed with:

      org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 3.0 failed 1 times, most recent failure: Lost task 3.0 in stage 3.0 (TID 6, localhost): java.lang.IllegalArgumentException: You cannot call toBytes() more than once without calling reset()
      	at parquet.Preconditions.checkArgument(Preconditions.java:47)
      	at parquet.column.values.rle.RunLengthBitPackingHybridEncoder.toBytes(RunLengthBitPackingHybridEncoder.java:254)
      	at parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter.getBytes(RunLengthBitPackingHybridValuesWriter.java:68)
      	at parquet.column.impl.ColumnWriterImpl.writePage(ColumnWriterImpl.java:147)
      	at parquet.column.impl.ColumnWriterImpl.flush(ColumnWriterImpl.java:236)
      	at parquet.column.impl.ColumnWriteStoreImpl.flush(ColumnWriteStoreImpl.java:113)
      	at parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:153)
      	at parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:112)
      	at parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:73)
      	at org.apache.spark.sql.parquet.ParquetRelation2.org$apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:663)
      	at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:677)
      	at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:677)
      	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
      	at org.apache.spark.scheduler.Task.run(Task.scala:64)
      	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:212)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
      	at java.lang.Thread.run(Thread.java:745)
      
      Driver stacktrace:
      	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1211)
      	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1200)
      	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1199)
      	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
      	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
      	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1199)
      	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
      	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
      	at scala.Option.foreach(Option.scala:236)
      	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
      	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1399)
      	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1360)
      	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
      

      run it again, it failed with:

      15/03/27 13:26:16 WARN FSInputChecker: Problem opening checksum file: file:/Users/davies/work/spark/tmp/test_data/_temporary/_attempt_201503271324_0011_r_000003_0/part-r-00004.parquet.  Ignoring exception: java.io.EOFException
      	at java.io.DataInputStream.readFully(DataInputStream.java:197)
      	at java.io.DataInputStream.readFully(DataInputStream.java:169)
      	at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:134)
      	at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:283)
      	at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:427)
      	at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:402)
      	at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:295)
      	at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:294)
      	at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
      	at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
      	at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
      	at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
      	at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
      	at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
      	at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
      	at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
      	at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
      	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
      	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
      	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      
      Traceback (most recent call last):
        File "str.py", line 20, in <module>
          gen()
        File "str.py", line 12, in gen
          df.save("test_data", source="parquet", mode='overwrite')
        File "/Users/davies/work/spark/python/pyspark/sql/dataframe.py", line 215, in save
          self._jdf.save(source, jmode, joptions)
        File "/Users/davies/work/spark/python/lib/py4j/java_gateway.py", line 538, in __call__
          self.target_id, self.name)
        File "/Users/davies/work/spark/python/lib/py4j/protocol.py", line 300, in get_return_value
          format(target_id, '.', name), value)
      py4j.protocol.Py4JJavaError: An error occurred while calling o66.save.
      : scala.collection.parallel.CompositeThrowable: Multiple exceptions thrown during a parallel computation: org.apache.hadoop.fs.ChecksumException: Checksum error: file:/Users/davies/work/spark/tmp/test_data/_temporary/_attempt_201503271324_0011_r_000000_0/part-r-00001.parquet at 13237760
      org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.readChunk(ChecksumFileSystem.java:219)
      org.apache.hadoop.fs.FSInputChecker.readChecksumChunk(FSInputChecker.java:237)
      org.apache.hadoop.fs.FSInputChecker.fill(FSInputChecker.java:176)
      org.apache.hadoop.fs.FSInputChecker.read1(FSInputChecker.java:193)
      org.apache.hadoop.fs.FSInputChecker.read(FSInputChecker.java:158)
      org.apache.hadoop.fs.FSInputChecker.readFully(FSInputChecker.java:384)
      org.apache.hadoop.fs.FSInputChecker.seek(FSInputChecker.java:365)
      org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.seek(ChecksumFileSystem.java:271)
      org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:37)
      parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:413)
      .
      .
      .
      
      org.apache.hadoop.fs.ChecksumException: Checksum error: file:/Users/davies/work/spark/tmp/test_data/_temporary/_attempt_201503271324_0011_r_000001_0/part-r-00002.parquet at 13368832
      org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.readChunk(ChecksumFileSystem.java:219)
      org.apache.hadoop.fs.FSInputChecker.readChecksumChunk(FSInputChecker.java:237)
      org.apache.hadoop.fs.FSInputChecker.fill(FSInputChecker.java:176)
      org.apache.hadoop.fs.FSInputChecker.read1(FSInputChecker.java:193)
      org.apache.hadoop.fs.FSInputChecker.read(FSInputChecker.java:158)
      org.apache.hadoop.fs.FSInputChecker.readFully(FSInputChecker.java:384)
      org.apache.hadoop.fs.FSInputChecker.seek(FSInputChecker.java:365)
      org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.seek(ChecksumFileSystem.java:271)
      org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:37)
      parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:413)
      .
      .
      .
      
      org.apache.hadoop.fs.ChecksumException: Checksum error: file:/Users/davies/work/spark/tmp/test_data/_temporary/_attempt_201503271324_0011_r_000002_0/part-r-00003.parquet at 13565440
      org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.readChunk(ChecksumFileSystem.java:219)
      org.apache.hadoop.fs.FSInputChecker.readChecksumChunk(FSInputChecker.java:237)
      org.apache.hadoop.fs.FSInputChecker.fill(FSInputChecker.java:176)
      org.apache.hadoop.fs.FSInputChecker.read1(FSInputChecker.java:193)
      org.apache.hadoop.fs.FSInputChecker.read(FSInputChecker.java:158)
      org.apache.hadoop.fs.FSInputChecker.readFully(FSInputChecker.java:384)
      org.apache.hadoop.fs.FSInputChecker.seek(FSInputChecker.java:365)
      org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.seek(ChecksumFileSystem.java:271)
      org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:37)
      parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:413)
      .
      .
      .
      
      java.lang.RuntimeException: file:/Users/davies/work/spark/tmp/test_data/_temporary/_attempt_201503271324_0011_r_000003_0/part-r-00004.parquet is not a Parquet file (too small)
      parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:408)
      org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:295)
      org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:294)
      scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
      scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
      scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
      scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
      scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
      scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
      scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
      .
      .
      .
      	at scala.collection.parallel.package$$anon$1.alongWith(package.scala:85)
      	at scala.collection.parallel.Task$class.mergeThrowables(Tasks.scala:86)
      	at scala.collection.parallel.mutable.ParArray$Map.mergeThrowables(ParArray.scala:650)
      	at scala.collection.parallel.Task$class.tryMerge(Tasks.scala:72)
      	at scala.collection.parallel.mutable.ParArray$Map.tryMerge(ParArray.scala:650)
      	at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:190)
      	at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:514)
      	at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:162)
      	at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
      	at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
      	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
      	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
      	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      

      It will success after delete the files.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              davies Davies Liu
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: