Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-2592

NumberFormatException: Zero length BigInteger when write.precombine.field is decimal type

    XMLWordPrintableJSON

Details

    Description

      when write.precombine.field is decimal type,write decimal will be an empty byte array, when read will throw NumberFormatException: Zero length BigInteger like below:

      2021-10-20 17:14:03
      java.lang.NumberFormatException: Zero length BigInteger
          at java.math.BigInteger.<init>(BigInteger.java:302)
          at org.apache.flink.table.data.DecimalData.fromUnscaledBytes(DecimalData.java:223)
          at org.apache.flink.connectors.hudi.util.AvroToRowDataConverters.lambda$createDecimalConverter$4dc14f00$1(AvroToRowDataConverters.java:158)
          at org.apache.flink.connectors.hudi.util.AvroToRowDataConverters.lambda$createNullableConverter$4568343a$1(AvroToRowDataConverters.java:94)
          at org.apache.flink.connectors.hudi.util.AvroToRowDataConverters.lambda$createRowConverter$68595fbd$1(AvroToRowDataConverters.java:75)
          at org.apache.flink.connectors.hudi.table.format.mor.MergeOnReadInputFormat$1.hasNext(MergeOnReadInputFormat.java:300)
          at org.apache.flink.connectors.hudi.table.format.mor.MergeOnReadInputFormat$LogFileOnlyIterator.reachedEnd(MergeOnReadInputFormat.java:362)
          at org.apache.flink.connectors.hudi.table.format.mor.MergeOnReadInputFormat.reachedEnd(MergeOnReadInputFormat.java:202)
          at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90)
          at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
          at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
          at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
      

      analyze:

       

      HoodieAvroUtils.getNestedFieldVal will invoked to extract precombine field.

      next will invoke convertValueForAvroLogicalTypes. when field is decimal type,the bytebuffer will consumed, we should rewind.

      private static Object convertValueForAvroLogicalTypes(Schema fieldSchema, Object fieldValue) {
        if (fieldSchema.getLogicalType() == LogicalTypes.date()) {
          return LocalDate.ofEpochDay(Long.parseLong(fieldValue.toString()));
        } else if (fieldSchema.getLogicalType() instanceof LogicalTypes.Decimal) {
          Decimal dc = (Decimal) fieldSchema.getLogicalType();
          DecimalConversion decimalConversion = new DecimalConversion();
          if (fieldSchema.getType() == Schema.Type.FIXED) {
            return decimalConversion.fromFixed((GenericFixed) fieldValue, fieldSchema,
                LogicalTypes.decimal(dc.getPrecision(), dc.getScale()));
          } else if (fieldSchema.getType() == Schema.Type.BYTES) {
            
          //this methoad will consume the byteBuffer
      
            return decimalConversion.fromBytes((ByteBuffer) fieldValue, fieldSchema,
                LogicalTypes.decimal(dc.getPrecision(), dc.getScale()));
          }
        }
        return fieldValue;
      }
      

      Attachments

        Issue Links

          Activity

            People

              Matrix42 Matrix42
              Matrix42 Matrix42
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: