Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-2875

Concurrent call to HoodieMergeHandler cause parquet corruption

    XMLWordPrintableJSON

Details

    Description

      Problem:

      Some corrupted parquet files are generated and exceptions will be thrown when read.

      e.g.

       
      Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file <FilePath>
          at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251)
          at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
          at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
          at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49)
          at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
          at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:112)
          at java.util.concurrent.FutureTask.run(FutureTask.java:266)
          at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
          ... 4 more
      Caused by: org.apache.parquet.io.ParquetDecodingException: could not read page Page [bytes.size=1054316, valueCount=237, uncompressedSize=1054316] in col  required binary col
          at org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:599)
          at org.apache.parquet.column.impl.ColumnReaderImpl.access$300(ColumnReaderImpl.java:57)
          at org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:536)
          at org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:533)
          at org.apache.parquet.column.page.DataPageV1.accept(DataPageV1.java:95)
          at org.apache.parquet.column.impl.ColumnReaderImpl.readPage(ColumnReaderImpl.java:533)
          at org.apache.parquet.column.impl.ColumnReaderImpl.checkRead(ColumnReaderImpl.java:525)
          at org.apache.parquet.column.impl.ColumnReaderImpl.consume(ColumnReaderImpl.java:638)
          at org.apache.parquet.column.impl.ColumnReaderImpl.<init>(ColumnReaderImpl.java:353)
          at org.apache.parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:80)
          at org.apache.parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:75)
          at org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:271)
          at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
          at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
          at org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165)
          at org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
          at org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)
          at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:222)
          ... 11 more
      Caused by: java.io.EOFException
          at java.io.DataInputStream.readFully(DataInputStream.java:197)
          at java.io.DataInputStream.readFully(DataInputStream.java:169)
          at org.apache.parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:286)
          at org.apache.parquet.bytes.BytesInput.toByteBuffer(BytesInput.java:237)
          at org.apache.parquet.bytes.BytesInput.toInputStream(BytesInput.java:246)
          at org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:592)
       

      How to reproduce:

      We need a way which could interrupt one task w/o shutdown JVM. Let's say, speculation. When speculation is triggered, other tasks working at the same executor will have the risk to suffer a wrong parquet generation. This will not always result in corrupted parquet file. Nearly half of them will throw exception while there is few tasks succeed without any signal.

      RootCause:

      ParquetWriter is not thread safe. User of it should apply proper way to guarantee that there is not concurrent call to ParquetWriter.

      In the following code: 

      https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java#L103

      We call both write and close to parquet writer concurrently. Data may being written while we call close. In close method, compressor (a class used by parquet to do compressing which has a stateful data structure insied) will be cleared and payback to a pool for following reuse. Due to the concurrent write mentioned above, data may be continued pushed to compressor even though we have them cleared. Besides, there is a mechanism inside compressor which tries to check some invalid use. That's why some of invalid usage will throw exception rather than generate corrupted parquet.

      Validation:

      Current solution is validated by production environment. A signal is that when this fix applied is that there should be no task failed due to some error like "BlockCompressorStream: write beyond end of stream". The reason is that BlockCompressorStream checking mechanism will not be triggered by concurrent write.

      Attachments

        Activity

          People

            guanziyue ZiyueGuan
            guanziyue ZiyueGuan
            sivabalan narayanan
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Time Tracking

                Estimated:
                Original Estimate - 1h
                1h
                Remaining:
                Remaining Estimate - 1h
                1h
                Logged:
                Time Spent - Not Specified
                Not Specified