Uploaded image for project: 'Apache NiFi'
  1. Apache NiFi
  2. NIFI-6640

Schema Inference of UNION/CHOICE types not handled correctly

    XMLWordPrintableJSON

Details

    Description

      When reading the following CSV:

      Id|Value
      1|3
      2|3.75
      3|3.85
      4|8
      5|2.0
      6|4.0
      7|some_string
      

      And try to channel through a ConvertRecord processor, the following exception is thrown:

      2019-09-06 18:25:48,936 ERROR [Timer-Driven Process Thread-2] o.a.n.processors.standard.ConvertRecord ConvertRecord[id=07635c71-016d-1000-3847-ff916164b32a] Failed to process StandardFlowFileRecord[uuid=4b4ab01a-b349-4f83-9b25-6a58d0b29
      7c1,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1567786888281-1, container=default, section=1], offset=326669, length=56],offset=0,name=4b4ab01a-b349-4f83-9b25-6a58d0b297c1,size=56]; will route to failure: org.apa
      che.nifi.processor.exception.ProcessException: Could not parse incoming data
      org.apache.nifi.processor.exception.ProcessException: Could not parse incoming data
              at org.apache.nifi.processors.standard.AbstractRecordProcessor$1.process(AbstractRecordProcessor.java:170)
              at org.apache.nifi.controller.repository.StandardProcessSession.write(StandardProcessSession.java:2925)
              at org.apache.nifi.processors.standard.AbstractRecordProcessor.onTrigger(AbstractRecordProcessor.java:122)
              at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
              at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1162)
              at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:205)
              at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
              at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
              at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
              at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
              at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
              at java.lang.Thread.run(Thread.java:748)
      Caused by: org.apache.nifi.serialization.MalformedRecordException: Error while getting next record. Root cause: org.apache.nifi.serialization.record.util.IllegalTypeConversionException: Cannot convert value [some_string] of type class j
      ava.lang.String for field Value to any of the following available Sub-Types for a Choice: [FLOAT, INT]
              at org.apache.nifi.csv.CSVRecordReader.nextRecord(CSVRecordReader.java:119)
              at org.apache.nifi.serialization.RecordReader.nextRecord(RecordReader.java:50)
              at org.apache.nifi.processors.standard.AbstractRecordProcessor$1.process(AbstractRecordProcessor.java:156)
              ... 13 common frames omitted
      Caused by: org.apache.nifi.serialization.record.util.IllegalTypeConversionException: Cannot convert value [some_string] of type class java.lang.String for field Value to any of the following available Sub-Types for a Choice: [FLOAT, INT
      ]
              at org.apache.nifi.serialization.record.util.DataTypeUtils.convertType(DataTypeUtils.java:166)
              at org.apache.nifi.serialization.record.util.DataTypeUtils.convertType(DataTypeUtils.java:116)
              at org.apache.nifi.csv.AbstractCSVRecordReader.convert(AbstractCSVRecordReader.java:86)
              at org.apache.nifi.csv.CSVRecordReader.nextRecord(CSVRecordReader.java:105)
              ... 15 common frames omitted
      

      The problem is that FieldTypeInference has both a list of possibleDataTypes and a singleDataType and as long as an added dataType is not in a "wider" relationship with the previous types it is added to the possibleDataTypes. But once a "wider" type is added, it actually gets set as the singleDataType and the possibleDataTypes remains intact.

      However when we try to determine the actual dataType, if the possibleDataTypes is not null then it will be used and the singleDataType will be ignored.

      So in our example a FieldTypeInference with (FLOAT, INT) as possibleDataTypes and STRING as singleDataType will be created, the FLOAT or INT will be chosen and "some_string" will be tried being written as a float or integer.


      Also there is an issue with the handling of multiple datatypes when writing data.
      When multiple datatypes are possible, a so-called CHOICE datatype is assigned in the inferred schema. This contains the possible datatypes in a list.
      However most (if not all) of the times when choose a concrete datatype for a given value when writing it (tested with JSON and Avro writers), the first matching type is selected from the list. And in the current implementation, all number types are matching for all numbers, so 3.75 may be written as an INT, resulting in data loss.

      The problem is that the type list is not in any particular order and the first matching type is chosen.

      Attachments

        1. NIFI-6640.template.xml
          23 kB
          Tamas Palfy

        Issue Links

          Activity

            People

              tpalfy Tamas Palfy
              tpalfy Tamas Palfy
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 5.5h
                  5.5h