Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
None
Description
when write.precombine.field is decimal type,write decimal will be an empty byte array, when read will throw NumberFormatException: Zero length BigInteger like below:
2021-10-20 17:14:03 java.lang.NumberFormatException: Zero length BigInteger at java.math.BigInteger.<init>(BigInteger.java:302) at org.apache.flink.table.data.DecimalData.fromUnscaledBytes(DecimalData.java:223) at org.apache.flink.connectors.hudi.util.AvroToRowDataConverters.lambda$createDecimalConverter$4dc14f00$1(AvroToRowDataConverters.java:158) at org.apache.flink.connectors.hudi.util.AvroToRowDataConverters.lambda$createNullableConverter$4568343a$1(AvroToRowDataConverters.java:94) at org.apache.flink.connectors.hudi.util.AvroToRowDataConverters.lambda$createRowConverter$68595fbd$1(AvroToRowDataConverters.java:75) at org.apache.flink.connectors.hudi.table.format.mor.MergeOnReadInputFormat$1.hasNext(MergeOnReadInputFormat.java:300) at org.apache.flink.connectors.hudi.table.format.mor.MergeOnReadInputFormat$LogFileOnlyIterator.reachedEnd(MergeOnReadInputFormat.java:362) at org.apache.flink.connectors.hudi.table.format.mor.MergeOnReadInputFormat.reachedEnd(MergeOnReadInputFormat.java:202) at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
analyze:
HoodieAvroUtils.getNestedFieldVal will invoked to extract precombine field.
next will invoke convertValueForAvroLogicalTypes. when field is decimal type,the bytebuffer will consumed, we should rewind.
private static Object convertValueForAvroLogicalTypes(Schema fieldSchema, Object fieldValue) { if (fieldSchema.getLogicalType() == LogicalTypes.date()) { return LocalDate.ofEpochDay(Long.parseLong(fieldValue.toString())); } else if (fieldSchema.getLogicalType() instanceof LogicalTypes.Decimal) { Decimal dc = (Decimal) fieldSchema.getLogicalType(); DecimalConversion decimalConversion = new DecimalConversion(); if (fieldSchema.getType() == Schema.Type.FIXED) { return decimalConversion.fromFixed((GenericFixed) fieldValue, fieldSchema, LogicalTypes.decimal(dc.getPrecision(), dc.getScale())); } else if (fieldSchema.getType() == Schema.Type.BYTES) { //this methoad will consume the byteBuffer return decimalConversion.fromBytes((ByteBuffer) fieldValue, fieldSchema, LogicalTypes.decimal(dc.getPrecision(), dc.getScale())); } } return fieldValue; }
Attachments
Issue Links
- links to