Uploaded image for project: 'Apache NiFi'
  1. Apache NiFi
  2. NIFI-6937

Fix: ReportLineageToAtlas creates data and queue with same qualified name leading to exception

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.11.0
    • Component/s: Extensions
    • Labels:
      None

      Description

      ReportLineageToAtlas with certain setup can throw the following exception, failing to send reports to Atlas (but keeping on trying infinitely):

      Error running task[id=9a705e6d-0168-1000-0000-00001cacda42] due to java.lang.IllegalStateException: Duplicate key {Id='(type: nifi_queue, id: 03f60cff-0aca-4536-a4ff-00eab811600c)', traits=[], values={}}
      

      The exception is coming from
      https://github.com/apache/nifi/blob/79a7014a95dc3087f88248c732fb1e4ad8e6e128/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NotificationSender.java#L390
      and the problem is that an Atlas Processor entity has a nifi_queue and a nifi_data DataSet input entity with the same qualifiedName.

      It can happen when the NiFi processor (P_Subject) that corresponds to the Atlas Processor entity

      1. has an inbound connection that is represented in Atlas by a nifi_queue entity. (There are multiple ways to enforce this, one by making sure the origin processor of the inbound queue (P_Origin, where P_Origin -> P_Subject) has a connection to another processor as well, like P_Origin -> P_Other, so the flow looks like this:    
                 /->P_Subject
                /
        P_Origin 
                \
                 \->P_Other
        
      1. also generates an input (CREATE, RECEIVE or FETCH) provenance event on its own and does not have a special input (like fs_path or hive_table), just uses the generic nifi_data Atlas type for representing its input (called "unknown" processor in the documentation of the reporting task)

      See attached atlas_duplicate_key.xml for an example flow template.

      Here InvokeHTTP has an input nifi_queue entity in Atlas (see explanation above, for more details see the Path Separation Logic section in the reporting task docs). Its qualifiedName is processorUUID@clustername (derived from the next processor's UUID, so InvokeHTTP's UUID in this case).

      It also sends the incoming flowfile in the HTTP request and creates another flowfile from the HTTP response which generates a FETCH event which in turn generates a nifi_data entity in Atlas. Its qualifiedName is also processorUUID@clustername (using the processor's UUID that generates the event, so InvokeHTTP's UUID).

      These two entities having the same qualifiedName causes the duplicate key error.

        Attachments

          Activity

            People

            • Assignee:
              tpalfy Tamas Palfy
              Reporter:
              tpalfy Tamas Palfy

              Dates

              • Created:
                Updated:
                Resolved:

                Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 50m
                50m

                  Issue deployment