Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
None
-
None
-
Debian, Java 11 and Java 8
Description
Assessment
AvroTypeUtil.convertUnionFieldValue
has the following:
Optional<Schema> mostSuitableType = DataTypeUtils.findMostSuitableType( originalValue, fieldSchema.getTypes().stream().filter(schema -> schema.getType() != Type.NULL).collect(Collectors.toList()), subSchema -> AvroTypeUtil.determineDataType(subSchema) );
which in turn has the following:
DataType inferredDataType = inferDataType(value, null);
which in turn has the following:
if (value instanceof Map) { final Map<String, ?> map = (Map<String, ?>) value;
originalValue/value is a map extracted from an avro record that has Utf8 keys instead of String.
The issue in general however is the fact that we are dealing with an avro-specific object where previously only NiFi-specific value objects were processed.
There are multiple approaches to fix this:
- Consider this special case as a technical issue. We accept the fact that avro objects can leak into this layer and prepare it so it behaves as needed. I.e. transform the avro map to another where the keys are String objects.
- Consider this an error-handling issue. Inference can be treated as a best-effort attempt and in case of an error we can fall back to the original logic. Inference was added here to be able to choose the best matching type from a UNION/CHOICE. If inference doesn't yield a result, the original logic goes over all types within the UNION/CHOICE and selects the first compatible one. When a Map is in a UNION/CHOICE the other types will not pose compatibility issues so the original logic would work well.
(1. and 2. are not mutually exclusive.) - Enhance inference logic so that the avro object is converted to a general object before inference occurs. This would eliminate the avro (or other third-party specific) objects being able to leak into the framework's format-agnostic layer.
Issue report
Severe regression in Version 1.11.3, compared to 1.9.2:
Record based processors cannot deserialize Avro messages any longer. Examples:
- ConsumeKafkaRecord: with embedded Avro schema or using Confluent Schema Registry
- ConvertRecord: with embedded Avro schema or using Confluent Schema Registry, too
- probably others as well...
Error messages:
ConvertRecord[id=c3ed29c6-0170-1000-a960-809827e7654d] Failed to process StandardFlowFileRecord[uuid=b3869d82-6c50-484e-8d0c-b64b5d5a3ac3,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1584002690648-1091, container=default, section=67], offset=276387, length=3487] ,offset=0,name=b3869d82-6c50-484e-8d0c-b64b5d5a3ac3,size=3487]; will route to failure: Could not parse incoming data
ConsumeKafkaRecord_2_0[id=d9ebdbda-51b7-38ce-b43e-3197322bd2e1] Failed to parse message from Kafka using the configured Record Reader. Will route message as its own FlowFile to the 'parse.failure' relationship: org.apache.nifi.serialization.MalformedRecordException: Error while getting next record. Root cause: java.lang.ClassCastException
However, the messages with enmbedded schema can flawlessly be converted to JSON using ConvertAvroToJson.
The behavior has been confirmed using various different flows and configurations with different Java versions. A downgrade to Nifi 1.9.2 resolves the issue, a subsequent upgrade to 1.11.3 brings it back.
Please find attached a minimal example template...
Stack traces:
2020-03-12 09:37:16,628 DEBUG [Timer-Driven Process Thread-4] org.apache.nifi.avro.AvroTypeUtil fail to convert field tags java.lang.ClassCastException: class org.apache.avro.util.Utf8 cannot be cast to class java.lang.String (org.apache.avro.util.Utf8 is in unnamed module of loader org.apache.nifi.nar.NarClassLoader @515ebef3; java.lang.String is in module java.base of loader 'bootstrap') at org.apache.nifi.serialization.record.util.DataTypeUtils.inferRecordDataType(DataTypeUtils.java:544) at org.apache.nifi.serialization.record.util.DataTypeUtils.inferDataType(DataTypeUtils.java:478) at org.apache.nifi.serialization.record.util.DataTypeUtils.findMostSuitableType(DataTypeUtils.java:267) at org.apache.nifi.avro.AvroTypeUtil.convertUnionFieldValue(AvroTypeUtil.java:882) at org.apache.nifi.avro.AvroTypeUtil.normalizeValue(AvroTypeUtil.java:1004) at org.apache.nifi.avro.AvroTypeUtil.convertAvroRecordToMap(AvroTypeUtil.java:857) at org.apache.nifi.avro.AvroTypeUtil.convertAvroRecordToMap(AvroTypeUtil.java:830) at org.apache.nifi.avro.AvroRecordReader.nextRecord(AvroRecordReader.java:45) at org.apache.nifi.serialization.RecordReader.nextRecord(RecordReader.java:50) at org.apache.nifi.processors.standard.AbstractRecordProcessor$1.process(AbstractRecordProcessor.java:131) at org.apache.nifi.controller.repository.StandardProcessSession.write(StandardProcessSession.java:3006) at org.apache.nifi.processors.standard.AbstractRecordProcessor.onTrigger(AbstractRecordProcessor.java:122) at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1176) at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:213) at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117) at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834)2020-03-12 09:37:16,632 ERROR [Timer-Driven Process Thread-4] o.a.n.processors.standard.ConvertRecord ConvertRecord[id=c3ed29c6-0170-1000-a960-809827e7654d] Failed to process StandardFlowFileRecord[uuid=33856f9d-1991-4c95-90c2-3ffd032fc840,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1584005835899-1, container=default, section=1], offset=851, length=3487],offset=0,name=33856f9d-1991-4c95-90c2-3ffd032fc840,size=3487]; will route to failure: org.apache.nifi.processor.exception.ProcessException: Could not parse incoming data org.apache.nifi.processor.exception.ProcessException: Could not parse incoming data at org.apache.nifi.processors.standard.AbstractRecordProcessor$1.process(AbstractRecordProcessor.java:171) at org.apache.nifi.controller.repository.StandardProcessSession.write(StandardProcessSession.java:3006) at org.apache.nifi.processors.standard.AbstractRecordProcessor.onTrigger(AbstractRecordProcessor.java:122) at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1176) at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:213) at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117) at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: org.apache.nifi.serialization.MalformedRecordException: Error while getting next record. Root cause: java.lang.ClassCastException: class org.apache.avro.util.Utf8 cannot be cast to class java.lang.String (org.apache.avro.util.Utf8 is in unnamed module of loader org.apache.nifi.nar.NarClassLoader @515ebef3; java.lang.String is in module java.base of loader 'bootstrap') at org.apache.nifi.avro.AvroRecordReader.nextRecord(AvroRecordReader.java:52) at org.apache.nifi.serialization.RecordReader.nextRecord(RecordReader.java:50) at org.apache.nifi.processors.standard.AbstractRecordProcessor$1.process(AbstractRecordProcessor.java:131) ... 13 common frames omitted Caused by: java.lang.ClassCastException: class org.apache.avro.util.Utf8 cannot be cast to class java.lang.String (org.apache.avro.util.Utf8 is in unnamed module of loader org.apache.nifi.nar.NarClassLoader @515ebef3; java.lang.String is in module java.base of loader 'bootstrap') at org.apache.nifi.serialization.record.util.DataTypeUtils.inferRecordDataType(DataTypeUtils.java:544) at org.apache.nifi.serialization.record.util.DataTypeUtils.inferDataType(DataTypeUtils.java:478) at org.apache.nifi.serialization.record.util.DataTypeUtils.findMostSuitableType(DataTypeUtils.java:267) at org.apache.nifi.avro.AvroTypeUtil.convertUnionFieldValue(AvroTypeUtil.java:882) at org.apache.nifi.avro.AvroTypeUtil.normalizeValue(AvroTypeUtil.java:1004) at org.apache.nifi.avro.AvroTypeUtil.convertAvroRecordToMap(AvroTypeUtil.java:857) at org.apache.nifi.avro.AvroTypeUtil.convertAvroRecordToMap(AvroTypeUtil.java:830) at org.apache.nifi.avro.AvroRecordReader.nextRecord(AvroRecordReader.java:45) ... 15 common frames omitted
Attachments
Attachments
Issue Links
- links to