Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-17804

Follow the spec when decoding Parquet logical DECIMAL type

    XMLWordPrintableJSON

Details

    Description

      When reading a Parquet file (produced by Spark 2.4.0 with default configuration) Flink's ParquetRowInputFormat fails with NumberFormatException.

      After debugging this it seems that Flink doesn't follow the Parquet spec on encoding DECIMAL logical type

      The Parquet schema for this field is:

      optional fixed_len_byte_array(9) price_usd (DECIMAL(19,4));
      

      If I understand the spec correctly, it says that the value should contain a binary representation of an unscaled decimal. Flink's RowConverter however treats it as a base-10 UTF-8 string.

      What Flink essentially is doing:

      val binary = Array[Byte](0, 0, 0, 0, 0, 11, -97, 118, -64)
      val decimal = new java.math.BigDecimal(new String(binary, "UTF-8").toCharArray)
      

      What I think spec suggests:

      val binary = Array[Byte](0, 0, 0, 0, 0, 11, -97, 118, -64)
      val unscaled = new java.math.BigInteger(binary)
      val decimal = new java.math.BigDecimal(unscaled)
      

      Error stacktrace:
       

      java.lang.NumberFormatException
      	at java.math.BigDecimal.<init>(BigDecimal.java:497)
      	at java.math.BigDecimal.<init>(BigDecimal.java:383)
      	at java.math.BigDecimal.<init>(BigDecimal.java:680)
      	at org.apache.flink.formats.parquet.utils.RowConverter$RowPrimitiveConverter.addBinary(RowConverter.java:202)
      	at org.apache.parquet.column.impl.ColumnReaderImpl$2$6.writeValue(ColumnReaderImpl.java:317)
      	at org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:367)
      	at org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:406)
      	at org.apache.flink.formats.parquet.utils.ParquetRecordReader.readNextRecord(ParquetRecordReader.java:235)
      	at org.apache.flink.formats.parquet.utils.ParquetRecordReader.reachEnd(ParquetRecordReader.java:207)
      	at org.apache.flink.formats.parquet.ParquetInputFormat.reachedEnd(ParquetInputFormat.java:233)
      	at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:327)
      

      Thanks!

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              sergiimk Sergii Mikhtoniuk
              Votes:
              1 Vote for this issue
              Watchers:
              8 Start watching this issue

              Dates

                Created:
                Updated: