Uploaded image for project: 'Apache NiFi'
  1. Apache NiFi
  2. NIFI-11671

JoinEnrichment SQL strategy doesn't allow attributes in join statement

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • 1.18.0, 1.20.0, 1.23.0
    • 2.0.0-M1
    • Extensions
    • None

    Description

      We use ForkEnrichement - JoinEnrichment pattern and want to include filtering in join SQL. Filter value is coming from FlowFile attribute

      ${test}  = 'NewValue'
      
      SELECT original.*, enrichment.*,'${test}'
      FROM original 
      LEFT OUTER JOIN enrichment 
      ON original.Underlying = enrichment.Underlying
      WHERE enrichment.MyField = '${test}'
      

      However this doesnt work because JoinEnrichment doesnt use evaluateAttributeExpressions

      Additionally in version 1.18,1.23 - doesnt allow whole query to be passed as attribute.

       

      2023-06-28 11:07:16,611 ERROR [Timer-Driven Process Thread-7] o.a.n.processors.standard.JoinEnrichment JoinEnrichment[id=dbe156ac-0187-1000-4477-0183899e0432] Failed to join 'original' FlowFile StandardFlowFileRecord[uuid=2ab9f6ad-73a5-4763-b25e-fd26c44835e1,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1687948831976-629, container=default, section=629], offset=8334082, length=600557],offset=0,name=lmr_SY08C41-1_S_514682_20230627.csv,size=600557] and 'enrichment' FlowFile StandardFlowFileRecord[uuid=e4bb7769-fdce-4dfe-af18-443676103035,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1687949723375-631, container=default, section=631], offset=5362822, length=1999502],offset=0,name=lmr_SY08C41-1_S_514682_20230627.csv,size=1999502]; routing to failure
      java.sql.SQLException: Error while preparing statement [${instrumentJoinSQL}]
          at org.apache.calcite.avatica.Helper.createException(Helper.java:56)
          at org.apache.calcite.avatica.Helper.createException(Helper.java:41)
          at org.apache.calcite.jdbc.CalciteConnectionImpl.prepareStatement_(CalciteConnectionImpl.java:224)
          at org.apache.calcite.jdbc.CalciteConnectionImpl.prepareStatement(CalciteConnectionImpl.java:203)
          at org.apache.calcite.jdbc.CalciteConnectionImpl.prepareStatement(CalciteConnectionImpl.java:99)
          at org.apache.calcite.avatica.AvaticaConnection.prepareStatement(AvaticaConnection.java:178)
          at org.apache.nifi.processors.standard.enrichment.SqlJoinCache.createCalciteParameters(SqlJoinCache.java:91)
          at org.apache.nifi.processors.standard.enrichment.SqlJoinCache.getCalciteParameters(SqlJoinCache.java:65)
          at org.apache.nifi.processors.standard.enrichment.SqlJoinStrategy.join(SqlJoinStrategy.java:49)
          at org.apache.nifi.processors.standard.JoinEnrichment.processBin(JoinEnrichment.java:387)
          at org.apache.nifi.processor.util.bin.BinFiles.processBins(BinFiles.java:233)
          at org.apache.nifi.processors.standard.JoinEnrichment.processBins(JoinEnrichment.java:503)
          at org.apache.nifi.processor.util.bin.BinFiles.onTrigger(BinFiles.java:193)
          at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1354)
          at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:246)
          at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:102)
          at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
          at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
          at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
          at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
          at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
          at java.lang.Thread.run(Thread.java:750)
      Caused by: java.lang.RuntimeException: parse failed: Encountered "$" at line 1, column 1.
      Was expecting one of:
          "ABS" ...
       

      As I understand issue is in following line of code
      https://github.com/apache/nifi/blob/ce0122bd2530e37eb57ace919a42bb2eb3cd5c02/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoinEnrichment.java#L441

      instead of:

      final String sql = context.getProperty(SQL).getValue();
      

      it should be:

      final String sql = context.getProperty(SQL).evaluateAttributeExpressions(attributes).getValue();
      

      markap14 jgresock - can you please take a look - this is quite a big showstopper.

      Documentation says that JoinEnrichment supports expression language

      Attachments

        1. screenshot-2.png
          39 kB
          Philipp Korniets
        2. screenshot-1.png
          218 kB
          Philipp Korniets

        Issue Links

          Activity

            People

              markap14 Mark Payne
              iiojj2 Philipp Korniets
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 20m
                  20m