Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
None
-
None
-
None
Description
The Avro Writer (WriteAvroResultWithExternalSchema) has a close method that is not idempotent. As a result, if the close() method is called multiple times, it can cause Exceptions to be thrown and potentially even cause data duplication or corruption, depending on the underlying OutputStream, as it could flush data to an OutputStream, and the OutputStream may block close() methods; a second call to WriteAvroResultWithExternalSchema.close() could then flush more data to the stream.
This can produce a stack trace such as:
2022-03-01 11:06:11,677 ERROR [Timer-Driven Process Thread-5] o.a.n.processors.standard.ValidateRecord ValidateRecord[id=39b23e8a-5b1a-110d-3bcc-5dca95722468] Failed to close Record Writer: java.io.IOException: Stream is closed ↳ causes: org.apache.nifi.processor.exception.FlowFileAccessException: Failed to write to Content Repository for StandardFlowFileRecord[uuid=b57f14e0-2498-4e9d-8730-4b2e0c9dc874,claim=,offset=0,name=0076a439-5911-4178-b0e6-c7296c0f09ae,size=0] org.apache.nifi.processor.exception.FlowFileAccessException: Failed to write to Content Repository for StandardFlowFileRecord[uuid=b57f14e0-2498-4e9d-8730-4b2e0c9dc874,claim=,offset=0,name=0076a439-5911-4178-b0e6-c7296c0f09ae,size=0] at org.apache.nifi.controller.repository.StandardProcessSession$7.write(StandardProcessSession.java:2994) at org.apache.nifi.controller.repository.io.TaskTerminationOutputStream.write(TaskTerminationOutputStream.java:62) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) at org.apache.avro.io.BufferedBinaryEncoder$OutputStreamSink.innerFlush(BufferedBinaryEncoder.java:220) at org.apache.avro.io.BufferedBinaryEncoder.flush(BufferedBinaryEncoder.java:85) at org.apache.avro.io.BlockingBinaryEncoder.flush(BlockingBinaryEncoder.java:251) at org.apache.nifi.avro.WriteAvroResultWithExternalSchema.flush(WriteAvroResultWithExternalSchema.java:95) at org.apache.nifi.avro.WriteAvroResultWithExternalSchema.close(WriteAvroResultWithExternalSchema.java:112) at sun.reflect.GeneratedMethodAccessor444.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:254) at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.access$100(StandardControllerServiceInvocationHandler.java:38) at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler$ProxiedReturnObjectInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:240) at com.sun.proxy.$Proxy193.close(Unknown Source) at org.apache.nifi.processors.standard.ValidateRecord.closeQuietly(ValidateRecord.java:459) at org.apache.nifi.processors.standard.ValidateRecord.onTrigger(ValidateRecord.java:434) at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1284) at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:214) at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:103) at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: Stream is closed at org.apache.nifi.controller.repository.io.DisableOnCloseOutputStream.write(DisableOnCloseOutputStream.java:46) at org.apache.nifi.stream.io.ByteCountingOutputStream.write(ByteCountingOutputStream.java:49) at org.apache.nifi.controller.repository.StandardProcessSession$7.write(StandardProcessSession.java:2989) ... 29 common frames omitted
Attachments
Issue Links
- links to