Details
-
Bug
-
Status: Resolved
-
Minor
-
Resolution: Fixed
-
1.18.0, 1.20.0, 1.23.0
-
None
Description
We use ForkEnrichement - JoinEnrichment pattern and want to include filtering in join SQL. Filter value is coming from FlowFile attribute
${test} = 'NewValue' SELECT original.*, enrichment.*,'${test}' FROM original LEFT OUTER JOIN enrichment ON original.Underlying = enrichment.Underlying WHERE enrichment.MyField = '${test}'
However this doesnt work because JoinEnrichment doesnt use evaluateAttributeExpressions
Additionally in version 1.18,1.23 - doesnt allow whole query to be passed as attribute.
2023-06-28 11:07:16,611 ERROR [Timer-Driven Process Thread-7] o.a.n.processors.standard.JoinEnrichment JoinEnrichment[id=dbe156ac-0187-1000-4477-0183899e0432] Failed to join 'original' FlowFile StandardFlowFileRecord[uuid=2ab9f6ad-73a5-4763-b25e-fd26c44835e1,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1687948831976-629, container=default, section=629], offset=8334082, length=600557],offset=0,name=lmr_SY08C41-1_S_514682_20230627.csv,size=600557] and 'enrichment' FlowFile StandardFlowFileRecord[uuid=e4bb7769-fdce-4dfe-af18-443676103035,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1687949723375-631, container=default, section=631], offset=5362822, length=1999502],offset=0,name=lmr_SY08C41-1_S_514682_20230627.csv,size=1999502]; routing to failure java.sql.SQLException: Error while preparing statement [${instrumentJoinSQL}] at org.apache.calcite.avatica.Helper.createException(Helper.java:56) at org.apache.calcite.avatica.Helper.createException(Helper.java:41) at org.apache.calcite.jdbc.CalciteConnectionImpl.prepareStatement_(CalciteConnectionImpl.java:224) at org.apache.calcite.jdbc.CalciteConnectionImpl.prepareStatement(CalciteConnectionImpl.java:203) at org.apache.calcite.jdbc.CalciteConnectionImpl.prepareStatement(CalciteConnectionImpl.java:99) at org.apache.calcite.avatica.AvaticaConnection.prepareStatement(AvaticaConnection.java:178) at org.apache.nifi.processors.standard.enrichment.SqlJoinCache.createCalciteParameters(SqlJoinCache.java:91) at org.apache.nifi.processors.standard.enrichment.SqlJoinCache.getCalciteParameters(SqlJoinCache.java:65) at org.apache.nifi.processors.standard.enrichment.SqlJoinStrategy.join(SqlJoinStrategy.java:49) at org.apache.nifi.processors.standard.JoinEnrichment.processBin(JoinEnrichment.java:387) at org.apache.nifi.processor.util.bin.BinFiles.processBins(BinFiles.java:233) at org.apache.nifi.processors.standard.JoinEnrichment.processBins(JoinEnrichment.java:503) at org.apache.nifi.processor.util.bin.BinFiles.onTrigger(BinFiles.java:193) at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1354) at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:246) at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:102) at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: java.lang.RuntimeException: parse failed: Encountered "$" at line 1, column 1. Was expecting one of: "ABS" ...
As I understand issue is in following line of code
https://github.com/apache/nifi/blob/ce0122bd2530e37eb57ace919a42bb2eb3cd5c02/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoinEnrichment.java#L441
instead of:
final String sql = context.getProperty(SQL).getValue();
it should be:
final String sql = context.getProperty(SQL).evaluateAttributeExpressions(attributes).getValue();
markap14 jgresock - can you please take a look - this is quite a big showstopper.
Documentation says that JoinEnrichment supports expression language
Attachments
Attachments
Issue Links
- is duplicated by
-
NIFI-14076 Expression Language doesn't evaluate in the JoinEnrichment processor
- Resolved
- links to