Uploaded image for project: 'Apache NiFi'
  1. Apache NiFi
  2. NIFI-9741

Encountering "IOException: Stream is closed" when using Avro Writer

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • 1.16.0
    • None
    • None

    Description

      The Avro Writer (WriteAvroResultWithExternalSchema) has a close method that is not idempotent. As a result, if the close() method is called multiple times, it can cause Exceptions to be thrown and potentially even cause data duplication or corruption, depending on the underlying OutputStream, as it could flush data to an OutputStream, and the OutputStream may block close() methods; a second call to WriteAvroResultWithExternalSchema.close() could then flush more data to the stream.

      This can produce a stack trace such as:

      2022-03-01 11:06:11,677 ERROR [Timer-Driven Process Thread-5] o.a.n.processors.standard.ValidateRecord ValidateRecord[id=39b23e8a-5b1a-110d-3bcc-5dca95722468] Failed to close Record Writer: java.io.IOException: Stream is closed
      ↳ causes: org.apache.nifi.processor.exception.FlowFileAccessException: Failed to write to Content Repository for StandardFlowFileRecord[uuid=b57f14e0-2498-4e9d-8730-4b2e0c9dc874,claim=,offset=0,name=0076a439-5911-4178-b0e6-c7296c0f09ae,size=0]
      org.apache.nifi.processor.exception.FlowFileAccessException: Failed to write to Content Repository for StandardFlowFileRecord[uuid=b57f14e0-2498-4e9d-8730-4b2e0c9dc874,claim=,offset=0,name=0076a439-5911-4178-b0e6-c7296c0f09ae,size=0]
              at org.apache.nifi.controller.repository.StandardProcessSession$7.write(StandardProcessSession.java:2994)
              at org.apache.nifi.controller.repository.io.TaskTerminationOutputStream.write(TaskTerminationOutputStream.java:62)
              at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
              at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
              at org.apache.avro.io.BufferedBinaryEncoder$OutputStreamSink.innerFlush(BufferedBinaryEncoder.java:220)
              at org.apache.avro.io.BufferedBinaryEncoder.flush(BufferedBinaryEncoder.java:85)
              at org.apache.avro.io.BlockingBinaryEncoder.flush(BlockingBinaryEncoder.java:251)
              at org.apache.nifi.avro.WriteAvroResultWithExternalSchema.flush(WriteAvroResultWithExternalSchema.java:95)
              at org.apache.nifi.avro.WriteAvroResultWithExternalSchema.close(WriteAvroResultWithExternalSchema.java:112)
              at sun.reflect.GeneratedMethodAccessor444.invoke(Unknown Source)
              at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
              at java.lang.reflect.Method.invoke(Method.java:498)
              at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:254)
              at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.access$100(StandardControllerServiceInvocationHandler.java:38)
              at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler$ProxiedReturnObjectInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:240)
              at com.sun.proxy.$Proxy193.close(Unknown Source)
              at org.apache.nifi.processors.standard.ValidateRecord.closeQuietly(ValidateRecord.java:459)
              at org.apache.nifi.processors.standard.ValidateRecord.onTrigger(ValidateRecord.java:434)
              at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
              at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1284)
              at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:214)
              at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:103)
              at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
              at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
              at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
              at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
              at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
              at java.lang.Thread.run(Thread.java:748)
      Caused by: java.io.IOException: Stream is closed
              at org.apache.nifi.controller.repository.io.DisableOnCloseOutputStream.write(DisableOnCloseOutputStream.java:46)
              at org.apache.nifi.stream.io.ByteCountingOutputStream.write(ByteCountingOutputStream.java:49)
              at org.apache.nifi.controller.repository.StandardProcessSession$7.write(StandardProcessSession.java:2989)
              ... 29 common frames omitted 

      Attachments

        Issue Links

          Activity

            People

              markap14 Mark Payne
              markap14 Mark Payne
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 0.5h
                  0.5h