Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
None
-
None
Description
A couple of issues when upgrading NiFi and using a custom scripted writer with Groovy.
The scripted writer was something like:
import ... class GroovyRecordSetWriter implements RecordSetWriter { ... @Override WriteResult write(Record r) throws IOException { ... } @Override String getMimeType() { ... } @Override WriteResult write(final RecordSet rs) throws IOException { ... } public void beginRecordSet() throws IOException { ... } @Override public WriteResult finishRecordSet() throws IOException { ... } @Override public void close() throws IOException {} @Override public void flush() throws IOException {} } class GroovyRecordSetWriterFactory extends AbstractControllerService implements RecordSetWriterFactory { @Override RecordSchema getSchema(Map<String, String> variables, RecordSchema readSchema) throws SchemaNotFoundException, IOException { null } @Override RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, OutputStream out) throws SchemaNotFoundException, IOException { new GroovyRecordSetWriter(out) } } writer = new GroovyRecordSetWriterFactory()
With NIFI-6318 we changed a method in the interface RecordSetWriterFactory.
When using the above code in NiFi 1.9.2, it works fine but after an upgrade on 1.11.4, this breaks. The Controller Service, when enabled, is throwing the below message:
Can't have an abstract method in a non-abstract class. The class 'GroovyRecordSetWriterFactory' must be declared abstract or the method 'org.apache.nifi.serialization.RecordSetWriter createWriter(org.apache.nifi.logging.ComponentLog, org.apache.nifi.serialization.record.RecordSchema, java.io.OutputStream, java.util.Map)' must be implemented.
However the controller service is successfully enabled and the processors referencing it can be started. When using the ConvertRecord processor with the problematic controller service, it will throw the below NPE:
2020-11-26 15:46:13,876 ERROR [Timer-Driven Process Thread-25] o.a.n.processors.standard.ConvertRecord ConvertRecord[id=8b5456ae-71dc-3bd3-d0c0-df50d196fc00] Failed to process StandardFlowFileRecord[uuid=adebfcf6-b449-4d01-90a7-0463930aade0,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1606401933295-1, container=default, section=1], offset=80, length=296],offset=0,name=adebfcf6-b449-4d01-90a7-0463930aade0,size=296]; will route to failure: java.lang.NullPointerException java.lang.NullPointerException: null at org.apache.nifi.processors.standard.AbstractRecordProcessor$1.process(AbstractRecordProcessor.java:151) at org.apache.nifi.controller.repository.StandardProcessSession.write(StandardProcessSession.java:2986) at org.apache.nifi.processors.standard.AbstractRecordProcessor.onTrigger(AbstractRecordProcessor.java:122) at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1173) at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:214) at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117) at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
The fix is quite simple, it's just required to add the proper implementation method in the Groovy script. Something like:
class GroovyRecordSetWriterFactory extends AbstractControllerService implements RecordSetWriterFactory { @Override RecordSchema getSchema(Map<String, String> variables, RecordSchema readSchema) throws SchemaNotFoundException, IOException { null } @Override RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, OutputStream out) throws SchemaNotFoundException, IOException { new GroovyRecordSetWriter(out) } @Override RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, OutputStream out, Map<String, String> variables) throws SchemaNotFoundException, IOException { return createWriter(logger, schema, out) } }
However, there are two things to improve:
- if possible the controller service should not be successfully enabled - throwing a bulletin is nice but not enough in some environments where flow deployment is completely automated without the access to the UI. Besides the bulletin is only shown for 5 minutes and will then disappear. Besides, enabling/disabling the controller service without making any change to the script body won't perform the validation/compilation again and the bulletin won't be shown. It can make things hard to debug/locate since the NPE is not providing much info.
- the NPE should be handled properly to provide a more meaningful message