Uploaded image for project: 'Apache NiFi'
  1. Apache NiFi
  2. NIFI-6937

Fix: ReportLineageToAtlas creates data and queue with same qualified name leading to exception

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • 1.11.0
    • Extensions
    • None

    Description

      ReportLineageToAtlas with certain setup can throw the following exception, failing to send reports to Atlas (but keeping on trying infinitely):

      Error running task[id=9a705e6d-0168-1000-0000-00001cacda42] due to java.lang.IllegalStateException: Duplicate key {Id='(type: nifi_queue, id: 03f60cff-0aca-4536-a4ff-00eab811600c)', traits=[], values={}}
      

      The exception is coming from
      https://github.com/apache/nifi/blob/79a7014a95dc3087f88248c732fb1e4ad8e6e128/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NotificationSender.java#L390
      and the problem is that an Atlas Processor entity has a nifi_queue and a nifi_data DataSet input entity with the same qualifiedName.

      It can happen when the NiFi processor (P_Subject) that corresponds to the Atlas Processor entity

      1. has an inbound connection that is represented in Atlas by a nifi_queue entity. (There are multiple ways to enforce this, one by making sure the origin processor of the inbound queue (P_Origin, where P_Origin -> P_Subject) has a connection to another processor as well, like P_Origin -> P_Other, so the flow looks like this:    
                 /->P_Subject
                /
        P_Origin 
                \
                 \->P_Other
        
      1. also generates an input (CREATE, RECEIVE or FETCH) provenance event on its own and does not have a special input (like fs_path or hive_table), just uses the generic nifi_data Atlas type for representing its input (called "unknown" processor in the documentation of the reporting task)

      See attached atlas_duplicate_key.xml for an example flow template.

      Here InvokeHTTP has an input nifi_queue entity in Atlas (see explanation above, for more details see the Path Separation Logic section in the reporting task docs). Its qualifiedName is processorUUID@clustername (derived from the next processor's UUID, so InvokeHTTP's UUID in this case).

      It also sends the incoming flowfile in the HTTP request and creates another flowfile from the HTTP response which generates a FETCH event which in turn generates a nifi_data entity in Atlas. Its qualifiedName is also processorUUID@clustername (using the processor's UUID that generates the event, so InvokeHTTP's UUID).

      These two entities having the same qualifiedName causes the duplicate key error.

      Attachments

        1. atlas_duplicate_key.xml
          21 kB
          Tamas Palfy

        Issue Links

          Activity

            People

              tpalfy Tamas Palfy
              tpalfy Tamas Palfy
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 50m
                  50m