Uploaded image for project: 'Apache NiFi'
  1. Apache NiFi
  2. NIFI-5664

LookupRecord cannot handle JSON array properties from MongoDBLookupService

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.8.0, 1.7.1
    • Fix Version/s: 1.8.0
    • Component/s: Core Framework
    • Labels:
      None

      Description

      We are using a LookupRecord processor to add an array property to a Record.  The LookupRecord's `Lookup Service` is a MongoDBLookupService. The idea is for the LookupRecord to query Mongo documents by the Record's `_id`, to extract the array property at `f` on that document, and to insert that array into the Record at the RecordPath `f`. This flow results in the conversion error copied below, which completely prevents the FlowFile from proceeding (routes to Failure/Unmatched.) It appears that the `MongoDBLookupService` (or perhaps its `MongoDBControllerService`) stores Mongo document arrays as `ArrayList`s, which `DataTypeUtils#toArray` does not support. A simple fix appears to be to add a few lines to `DataTypeUtils#toArray` to support `ArrayList`s. I will provide a PR for this issue.

      This issue appears in 1.7.1, and is extant in 1.8.0-SNAPSHOT (2018-10-02 b4c8e0179). I am not aware of any workaround to enrich a Record with an array from Mongo.

      LookupRecord Configuration

      Property Value
      Result RecordPath /f
      Record Result Contents Insert Entire Record
      _id /_id

      MongoDBLookupService Configuration

      Property Value
      Lookup Value Field f

      Conversion error (with irrelevant Mongo document and schema properties elided):

      2018-10-04 15:00:31,348 ERROR [Timer-Driven Process Thread-1] o.a.n.processors.standard.LookupRecord LookupRecord[id=339bf72c-91c3-36f0-ac14-bc8b734e2582] Failed to write MapRecord[{... f=[Document{{fid=5762addaffa7cf0980267acb, l=1.0}}, Document{{fid=5762adf32fc74d0418104201, l=2.12}}, Document{{fid=5762addd18b8b607802fec6f, l=3.32}}, Document{{fid=587542dd638e870838a79d08, l=1.0}}, Document{{fid=5762addbffa7cf0980267acd, l=3.32}}], i=[Ljava.lang.Object;@35dee8fc, ...] with schema {
        "namespace": "nifi",
        "name": "MongoPosting",
        "type": "record",
        "fields": [
          ...
          { "name": "f", "type": [
              "null",
              { "type": "array", "items": {
                  "type": "record", "name": "PostingFeatureDetails", "fields": [
                      { "name": "fid", "type": "string" },
                      { "name": "l" , "type": ["null", "double"] }
                  ]}
              }
          ]},
          ...
        ]
      } as a JSON Object due to org.apache.nifi.serialization.record.util.IllegalTypeConversionException: Cannot convert value [[Document{{fid=5762addaffa7cf0980267acb, l=1.0}}, Document{{fid=5762adf32fc74d0418104201, l=2.12}}, Document{{fid=5762addd18b8b607802fec6f, l=3.32}}, Document{{fid=587542dd638e870838a79d08, l=1.0}}, Document{{fid=5762addbffa7cf0980267acd, l=3.32}}]] of type class java.util.ArrayList to Object Array for field f: org.apache.nifi.serialization.record.util.IllegalTypeConversionException: Cannot convert value [[Document{{fid=5762addaffa7cf0980267acb, l=1.0}}, Document{{fid=5762adf32fc74d0418104201, l=2.12}}, Document{{fid=5762addd18b8b607802fec6f, l=3.32}}, Document{{fid=587542dd638e870838a79d08, l=1.0}}, Document{{fid=5762addbffa7cf0980267acd, l=3.32}}]] of type class java.util.ArrayList to Object Array for field f
      org.apache.nifi.serialization.record.util.IllegalTypeConversionException: Cannot convert value [[Document{{fid=5762addaffa7cf0980267acb, l=1.0}}, Document{{fid=5762adf32fc74d0418104201, l=2.12}}, Document{{fid=5762addd18b8b607802fec6f, l=3.32}}, Document{{fid=587542dd638e870838a79d08, l=1.0}}, Document{{fid=5762addbffa7cf0980267acd, l=3.32}}]] of type class java.util.ArrayList to Object Array for field f
          at org.apache.nifi.serialization.record.util.DataTypeUtils.toArray(DataTypeUtils.java:347)
          at org.apache.nifi.serialization.record.util.DataTypeUtils.convertType(DataTypeUtils.java:153)
          at org.apache.nifi.serialization.record.util.DataTypeUtils.convertType(DataTypeUtils.java:115)
          at org.apache.nifi.json.WriteJsonResult.writeValue(WriteJsonResult.java:284)
          at org.apache.nifi.json.WriteJsonResult.writeRecord(WriteJsonResult.java:187)
          at org.apache.nifi.json.WriteJsonResult.writeRecord(WriteJsonResult.java:136)
          at org.apache.nifi.serialization.AbstractRecordSetWriter.write(AbstractRecordSetWriter.java:59)
          at org.apache.nifi.processors.standard.AbstractRouteRecord$1.process(AbstractRouteRecord.java:148)
          at org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2218)
          at org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2186)
          at org.apache.nifi.processors.standard.AbstractRouteRecord.onTrigger(AbstractRouteRecord.java:121)
          at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
          at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165)
          at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203)
          at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
          at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
          at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
          at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
          at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
          at java.lang.Thread.run(Thread.java:748)
      2018-10-04 15:00:31,349 ERROR [Timer-Driven Process Thread-1] o.a.n.processors.standard.LookupRecord LookupRecord[id=339bf72c-91c3-36f0-ac14-bc8b734e2582] Failed to process StandardFlowFileRecord[uuid=29b498fe-787f-4c62-a237-f95a8ebcb0b2,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1538665215854-2, container=default, section=2], offset=100486, length=635840],offset=624238,name=1675395032391,size=11602]: org.apache.nifi.serialization.record.util.IllegalTypeConversionException: Cannot convert value [[Document{{fid=5762addaffa7cf0980267acb, l=1.0}}, Document{{fid=5762adf32fc74d0418104201, l=2.12}}, Document{{fid=5762addd18b8b607802fec6f, l=3.32}}, Document{{fid=587542dd638e870838a79d08, l=1.0}}, Document{{fid=5762addbffa7cf0980267acd, l=3.32}}]] of type class java.util.ArrayList to Object Array for field f
      org.apache.nifi.serialization.record.util.IllegalTypeConversionException: Cannot convert value [[Document{{fid=5762addaffa7cf0980267acb, l=1.0}}, Document{{fid=5762adf32fc74d0418104201, l=2.12}}, Document{{fid=5762addd18b8b607802fec6f, l=3.32}}, Document{{fid=587542dd638e870838a79d08, l=1.0}}, Document{{fid=5762addbffa7cf0980267acd, l=3.32}}]] of type class java.util.ArrayList to Object Array for field f
          at org.apache.nifi.serialization.record.util.DataTypeUtils.toArray(DataTypeUtils.java:347)
          at org.apache.nifi.serialization.record.util.DataTypeUtils.convertType(DataTypeUtils.java:153)
          at org.apache.nifi.serialization.record.util.DataTypeUtils.convertType(DataTypeUtils.java:115)
          at org.apache.nifi.json.WriteJsonResult.writeValue(WriteJsonResult.java:284)
          at org.apache.nifi.json.WriteJsonResult.writeRecord(WriteJsonResult.java:187)
          at org.apache.nifi.json.WriteJsonResult.writeRecord(WriteJsonResult.java:136)
          at org.apache.nifi.serialization.AbstractRecordSetWriter.write(AbstractRecordSetWriter.java:59)
          at org.apache.nifi.processors.standard.AbstractRouteRecord$1.process(AbstractRouteRecord.java:148)
          at org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2218)
          at org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2186)
          at org.apache.nifi.processors.standard.AbstractRouteRecord.onTrigger(AbstractRouteRecord.java:121)
          at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
          at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165)
          at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203)
          at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
          at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
          at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
          at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
          at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
          at java.lang.Thread.run(Thread.java:748)

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                Unassigned
                Reporter:
                carlgieringer Carl Gieringer
              • Votes:
                0 Vote for this issue
                Watchers:
                2 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: