Uploaded image for project: 'Camel'
  1. Camel
  2. CAMEL-12244

RemoteFileProducer stopped instead of being released to the pool when "interceptSendToEndpoint" is used

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 2.19.0
    • Fix Version/s: 2.21.0
    • Component/s: None
    • Labels:
      None
    • Estimated Complexity:
      Unknown

      Description

      In our application we're using an SFTP producer with "fileExist=Move" and a specific "moveExisting" expression. I encountered a problem where this would sometimes work, and sometimes not (i.e. there would be no ".archived" file). Upon further investigation I found the problem and it seems to be a bug in Camel.

      Our SFTP endpoint looks like this:

      sftp://...:.../...?username=...&privateKeyPassphrase=...&privateKeyFile=...&useUserKnownHostsFile=false&jschLoggingLevel=ERROR&fileExist=Move&moveExisting=${file:name}.archived${date:now:yyyyMMddHHmmssSSS}
      

      We also have an interceptor:

      route.interceptSendToEndpoint("sftp://.*").process(exchange -> LOG.info("Sending file {} to {}", ...));
      

      As I discovered, using the interceptor wraps the RemoteFileProducer with InterceptSendToEndpoint. This however changes the behavior of the ProducerCache:

      public boolean doInAsyncProducer(...) {
          ...
          return producerCallback.doInAsyncProducer(producer, asyncProcessor, exchange, pattern, doneSync -> {
              ...
              if (producer instanceof ServicePoolAware) {
                  // release back to the pool
                  pool.release(endpoint, producer);
              } else if (!producer.isSingleton()) {
                  // stop and shutdown non-singleton producers as we should not leak resources
                  try {
                      ServiceHelper.stopAndShutdownService(producer);
                  } catch (Exception e) {
                      ...
                  }
              }
              ...
          });
          ...
      }
      

      RemoteFileProducer implements ServicePoolAware so it would normally go back to the pool, but InterceptSendToEndpoint does not. As a result, our producers keep getting stopped (note that RemoteFileProducer#isSingleton always returns false).

      What's more, somehow they are being reused and in the end we run into situations, where one thread is closing a producer, while another thread is trying to write with it.

      I set up some breakpoints that log the thread name and System#identityHashCode of the producer:

      2018-02-08 15:05:25.070 TRACE o.a.c.c.file.remote.RemoteFileProducer     : Starting producer: RemoteFileProducer[...]
      2018-02-08 15:05:25.073 TRACE o.a.c.c.file.remote.RemoteFileProducer     : Processing file: [my_file] for exchange: ...
      2018-02-08 15:05:25.073 DEBUG o.a.c.c.file.remote.RemoteFileProducer     : Not already connected/logged in. Connecting to: ...
      doStop(), time: 1518098725112,  thread [Camel (camel-1) thread #35 - CamelInvocationHandler], producer: 889747012
      	at org.apache.camel.component.file.remote.RemoteFileProducer.doStop(RemoteFileProducer.java:175)
      	at org.apache.camel.support.ServiceSupport.stop(ServiceSupport.java:102)
      	at org.apache.camel.util.ServiceHelper.stopService(ServiceHelper.java:142)
      	at org.apache.camel.impl.InterceptSendToEndpoint$1.stop(InterceptSendToEndpoint.java:196)
      	at org.apache.camel.support.ServiceSupport.shutdown(ServiceSupport.java:164)
      	at org.apache.camel.util.ServiceHelper.stopAndShutdownService(ServiceHelper.java:211)
      	at org.apache.camel.impl.ProducerCache.lambda$doInAsyncProducer$2(ProducerCache.java:450)
      	at org.apache.camel.processor.SendProcessor$2$1.done(SendProcessor.java:178)
      	at org.apache.camel.impl.InterceptSendToEndpoint$1.process(InterceptSendToEndpoint.java:171)
      	at org.apache.camel.processor.SendProcessor$2.doInAsyncProducer(SendProcessor.java:173)
      	at org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:436)
      	at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:168)
      	at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
      	at org.apache.camel.processor.Pipeline.process(Pipeline.java:120)
      	at org.apache.camel.processor.Pipeline.process(Pipeline.java:83)
      	at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:110)
      	at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541)
      	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198)
      	at org.apache.camel.processor.Pipeline.process(Pipeline.java:120)
      	at org.apache.camel.processor.Pipeline.process(Pipeline.java:83)
      	at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541)
      	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198)
      	at org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:695)
      	at org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:623)
      	at org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:247)
      	at org.apache.camel.processor.Splitter.process(Splitter.java:114)
      	at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
      	at org.apache.camel.processor.Pipeline.process(Pipeline.java:120)
      	at org.apache.camel.processor.Pipeline.process(Pipeline.java:83)
      	at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:110)
      	at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541)
      	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198)
      	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198)
      	at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97)
      	at org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:47)
      	at org.apache.camel.impl.DeferProducer.process(DeferProducer.java:72)
      	at org.apache.camel.component.bean.AbstractCamelInvocationHandler$1.call(AbstractCamelInvocationHandler.java:192)
      	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
      	at java.util.concurrent.FutureTask.run(FutureTask.java)
      	at java.util.concurrent.Executors$RunnableAdapter.call$$$capture(Executors.java:511)
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java)
      	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
      	at java.util.concurrent.FutureTask.run(FutureTask.java)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      	at java.lang.Thread.run(Thread.java:748)
      2018-02-08 15:05:25.966 DEBUG o.a.c.c.file.remote.RemoteFileProducer     : Connected and logged in to: ...
      2018-02-08 15:05:25.966 DEBUG o.a.c.c.file.remote.RemoteFileProducer     : Disconnecting from: ...
      2018-02-08 15:05:25.973 TRACE o.a.c.c.file.remote.RemoteFileProducer     : About to write [my_file] to [...] from exchange [...]
      2018-02-08 15:05:25.974 TRACE o.a.c.c.file.remote.RemoteFileProducer     : Stopping producer: RemoteFileProducer[...]
      2018-02-08 15:05:25.974 DEBUG o.a.c.c.file.remote.RemoteFileProducer     : Starting
      2018-02-08 15:05:25.974 TRACE o.a.c.c.file.remote.RemoteFileProducer     : Starting producer: RemoteFileProducer[...]
      2018-02-08 15:05:25.977 TRACE o.a.c.c.file.remote.RemoteFileProducer     : Processing file: [another_file] for exchange: Exchange[...]
      2018-02-08 15:05:25.977 DEBUG o.a.c.c.file.remote.RemoteFileProducer     : Not already connected/logged in. Connecting to: ...
      handleFailedWrite(), time: 1518098726072, thread [Camel (camel-1) thread #37 - CamelInvocationHandler], producer: 889747012
      	at org.apache.camel.component.file.remote.RemoteFileProducer.handleFailedWrite(RemoteFileProducer.java:81)
      	at org.apache.camel.component.file.GenericFileProducer.processExchange(GenericFileProducer.java:227)
      	at org.apache.camel.component.file.remote.RemoteFileProducer.process(RemoteFileProducer.java:58)
      	at org.apache.camel.impl.InterceptSendToEndpoint$1.process(InterceptSendToEndpoint.java:167)
      	at org.apache.camel.processor.SendProcessor$2.doInAsyncProducer(SendProcessor.java:173)
      	at org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:436)
      	at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:168)
      	at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
      	at org.apache.camel.processor.Pipeline.process(Pipeline.java:120)
      	at org.apache.camel.processor.Pipeline.process(Pipeline.java:83)
      	at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:110)
      	at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541)
      	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198)
      	at org.apache.camel.processor.Pipeline.process(Pipeline.java:120)
      	at org.apache.camel.processor.Pipeline.process(Pipeline.java:83)
      	at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541)
      	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198)
      	at org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:695)
      	at org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:623)
      	at org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:247)
      	at org.apache.camel.processor.Splitter.process(Splitter.java:114)
      	at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
      	at org.apache.camel.processor.Pipeline.process(Pipeline.java:120)
      	at org.apache.camel.processor.Pipeline.process(Pipeline.java:83)
      	at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:110)
      	at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541)
      	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198)
      	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198)
      	at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97)
      	at org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:47)
      	at org.apache.camel.impl.DeferProducer.process(DeferProducer.java:72)
      	at org.apache.camel.component.bean.AbstractCamelInvocationHandler$1.call(AbstractCamelInvocationHandler.java:192)
      	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
      	at java.util.concurrent.FutureTask.run(FutureTask.java)
      	at java.util.concurrent.Executors$RunnableAdapter.call$$$capture(Executors.java:511)
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java)
      	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
      	at java.util.concurrent.FutureTask.run(FutureTask.java)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      	at java.lang.Thread.run(Thread.java:748)
      Caused by: org.apache.camel.component.file.GenericFileOperationFailedException: Cannot change directory to: [my_directory]
      	at org.apache.camel.component.file.remote.SftpOperations.doChangeDirectory(SftpOperations.java:596)
      	at org.apache.camel.component.file.remote.SftpOperations.changeCurrentDirectory(SftpOperations.java:584)
      	at org.apache.camel.component.file.remote.SftpOperations.storeFile(SftpOperations.java:830)
      	at org.apache.camel.component.file.GenericFileProducer.writeFile(GenericFileProducer.java:277)
      	at org.apache.camel.component.file.GenericFileProducer.processExchange(GenericFileProducer.java:165)
      	... 39 more
      Caused by: 4:
      	at com.jcraft.jsch.ChannelSftp.cd(ChannelSftp.java:359)
      	at org.apache.camel.component.file.remote.SftpOperations.doChangeDirectory(SftpOperations.java:594)
      	... 43 more
      Caused by: java.io.IOException: Pipe closed
      	at java.io.PipedInputStream.read(PipedInputStream.java:307)
      	at com.jcraft.jsch.Channel$MyPipedInputStream.updateReadSide(Channel.java:362)
      	at com.jcraft.jsch.ChannelSftp.cd(ChannelSftp.java:337)
      	... 44 more
      2018-02-08 15:05:26.186 DEBUG o.a.c.c.file.remote.RemoteFileProducer     : Exception occurred during stopping: Cannot change directory to: [my_directory]
      

      So thread #35 stopped the producer, while thread #37 was trying to use it.

      One more ugly thing about it is that when SftpOperations fail due to a closed pipe, by the time we get to RemoteFileProducer#handleFailedWrite:

      public void handleFailedWrite(...) throws Exception {
          ...
          if (isStopping() || isStopped()) {
              // if we are stopping then ignore any exception during a poll
              log.debug("Exception occurred during stopping: " + exception.getMessage());
          } else {
              log.warn("Writing file failed with: " + exception.getMessage());
              ...
              throw exception;
          }
      }
      

      the producer is already stopped, so the exception is logged on DEBUG and not rethrown.

      Note that I'm writing multiple files in parallel (three in my case), I'm using this to send data to the route ending in the SFTP endpoint:

      @Produce(uri = "direct:myDir")
      private MyDir myDir;
      ...
      myDir.sendAsync(...)
      

      where

      public interface MyDir {
          Future<?> sendAsync(...);
      }
      

      We're using Camel 2.19.0, but so far that I've looked at the github repository, the issue is most likely present in the current version too.

        Attachments

          Activity

            People

            • Assignee:
              davsclaus Claus Ibsen
              Reporter:
              kszafran Krzysztof SzafraƄski
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: