Uploaded image for project: 'Camel'
  1. Camel
  2. CAMEL-9143

Producers that implement the ServicePoolAware interface cause memory leak due to JMX references

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.14.1, 2.14.2, 2.15.0, 2.15.1
    • 2.16.0, 2.14.4, 2.15.4
    • camel-core
    • None
    • Unknown

    Description

      Description

      Producer instances that implement the ServicePoolAware interface will leak memory if their route is stopped, with new producers being leaked every time the route is started/stopped.

      Known implementations that are affected are RemoteFileProducer (ftp, sftp) and Mina2Producer.

      This is due to the behaviour that the SendProcessor which when the route is stopped it shuts down it's `producerCache` instance.

          protected void doStop() throws Exception {
              ServiceHelper.stopServices(producerCache, producer);
          }
      

      this in turn calls `stopAndShutdownService(pool)` which will call stop on the SharedProducerServicePool instance which is a NOOP however it also calls shutdown which effects a stop of the global pool (this stops all the registered services and then clears the pool.

          protected void doStop() throws Exception {
              // when stopping we intend to shutdown
              ServiceHelper.stopAndShutdownService(pool);
              try {
                  ServiceHelper.stopAndShutdownServices(producers.values());
              } finally {
                  // ensure producers are removed, and also from JMX
                  for (Producer producer : producers.values()) {
                      getCamelContext().removeService(producer);
                  }
              }
              producers.clear();
          }
      

      However no call to `context.removeService(Producer) is called for the entries from the pool only those singleton instances that were in the `producers` map hence the JMX `ManagedProducer` that is created when `doGetProducer` invokes

                      getCamelContext().addService(answer, false);
      

      is never removed.

      Since the global pool is empty when the next request to get a producer is called a new producer is created, jmx wrapper and all, whilst the old instance remains orphaned retaining any objects that pertain to that instance.

      One workaround is for the producer to call

      getEndpoint().getCamelContext().removeService(this)

      in it's stop method, however this is fairly obscure and it would probably be better to invoke removal of the producer when it is removed from the shared pool.

      Another issue of note is that when a route is shutdown that contains a SendProcessor due to the shutdown invocation on the SharedProcessorServicePool the global pool is cleared of `everything` and remains in `Stopped` state until another route starts it (although it is still accessed and used whilst in the `Stopped` state).

      Impact

      For general use where there is no dynamic creation or passivation of routes this issue should be minimal, however in our use case where the routes are not static, there is a certain amount of recreation of routes as customer endpoints change and there is a need to passivate idle routes this causes a considerable memory leak (via SFTP in particular).

      Test Case

      package org.apache.camel.component;
      
      import com.google.common.util.concurrent.AtomicLongMap;
      
      import org.apache.camel.CamelContext;
      import org.apache.camel.Consumer;
      import org.apache.camel.Endpoint;
      import org.apache.camel.Exchange;
      import org.apache.camel.Processor;
      import org.apache.camel.Producer;
      import org.apache.camel.Route;
      import org.apache.camel.Service;
      import org.apache.camel.ServicePoolAware;
      import org.apache.camel.ServiceStatus;
      import org.apache.camel.builder.RouteBuilder;
      import org.apache.camel.impl.DefaultComponent;
      import org.apache.camel.impl.DefaultEndpoint;
      import org.apache.camel.impl.DefaultProducer;
      import org.apache.camel.support.LifecycleStrategySupport;
      import org.apache.camel.support.ServiceSupport;
      import org.apache.camel.test.junit4.CamelTestSupport;
      import org.junit.Test;
      
      import java.util.Map;
      
      import static com.google.common.base.Preconditions.checkNotNull;
      
      /**
       * Test memory behaviour of producers using {@link ServicePoolAware} when using JMX.
       */
      public class ServicePoolAwareLeakyTest extends CamelTestSupport {
      
        private static final String LEAKY_SIEVE_STABLE = "leaky://sieve-stable?plugged=true";
        private static final String LEAKY_SIEVE_TRANSIENT = "leaky://sieve-transient?plugged=true";
      
      
        private static boolean isPatchApplied() {
          return Boolean.parseBoolean(System.getProperty("patchApplied", "false"));
        }
      
        /**
         * Component that provides leaks producers.
         */
        private static class LeakySieveComponent extends DefaultComponent {
          @Override
          protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
            boolean plugged = "true".equalsIgnoreCase((String) parameters.remove("plugged"));
            return new LeakySieveEndpoint(uri, isPatchApplied() && plugged);
          }
        }
      
        /**
         * Endpoint that provides leaky producers.
         */
        private static class LeakySieveEndpoint extends DefaultEndpoint {
      
          private final String uri;
          private final boolean plugged;
      
          public LeakySieveEndpoint(String uri, boolean plugged) {
            this.uri = checkNotNull(uri, "uri must not be null");
            this.plugged = plugged;
          }
      
          @Override
          public Producer createProducer() throws Exception {
            return new LeakySieveProducer(this, plugged);
          }
      
          @Override
          public Consumer createConsumer(Processor processor) throws Exception {
            throw new UnsupportedOperationException();
          }
      
          @Override
          public boolean isSingleton() {
            return true;
          }
      
          @Override
          protected String createEndpointUri() {
            return uri;
          }
        }
      
        /**
         * Leaky producer - implements {@link ServicePoolAware}.
         */
        private static class LeakySieveProducer extends DefaultProducer implements ServicePoolAware {
      
          private final boolean plugged;
      
          public LeakySieveProducer(Endpoint endpoint, boolean plugged) {
            super(endpoint);
            this.plugged = plugged;
          }
      
          @Override
          public void process(Exchange exchange) throws Exception {
            // do nothing
          }
      
          @Override
          protected void doStop() throws Exception {
            super.doStop();
      
            //noinspection ConstantConditions
            if (plugged) {
              // need to remove self from services since we are ServicePoolAware this will not be handled for us otherwise we
              // leak memory
              getEndpoint().getCamelContext().removeService(this);
            }
          }
        }
      
        @Override
        protected boolean useJmx() {
          // only occurs when using JMX as the GC root for the producer is through a ManagedProducer created by the
          // context.addService() invocation
          return true;
        }
      
        /**
         * Returns true if verification of state should be performed during the test as opposed to at the end.
         */
        public boolean isFailFast() {
          return false;
        }
      
        /**
         * Returns true if during fast failure we should verify that the service pool remains in the started state.
         */
        public boolean isVerifyProducerServicePoolRemainsStarted() {
          return false;
        }
      
        @Override
        public boolean isUseAdviceWith() {
          return true;
        }
      
        @Test
        public void testForMemoryLeak() throws Exception {
          registerLeakyComponent();
      
          final AtomicLongMap<String> references = AtomicLongMap.create();
      
          // track LeakySieveProducer lifecycle
          context.addLifecycleStrategy(new LifecycleStrategySupport() {
            @Override
            public void onServiceAdd(CamelContext context, Service service, Route route) {
              if (service instanceof LeakySieveProducer) {
                references.incrementAndGet(((LeakySieveProducer) service).getEndpoint().getEndpointKey());
              }
            }
      
            @Override
            public void onServiceRemove(CamelContext context, Service service, Route route) {
              if (service instanceof LeakySieveProducer) {
                references.decrementAndGet(((LeakySieveProducer) service).getEndpoint().getEndpointKey());
              }
            }
          });
      
          context.addRoutes(new RouteBuilder() {
            @Override
            public void configure() throws Exception {
              from("direct:sieve-transient")
                  .id("sieve-transient")
                  .to(LEAKY_SIEVE_TRANSIENT);
      
              from("direct:sieve-stable")
                  .id("sieve-stable")
                  .to(LEAKY_SIEVE_STABLE);
            }
          });
      
          context.start();
      
          for (int i = 0; i < 1000; i++) {
            ServiceSupport service = (ServiceSupport) context.getProducerServicePool();
            assertEquals(ServiceStatus.Started, service.getStatus());
            if (isFailFast()) {
              assertEquals(2, context.getProducerServicePool().size());
              assertEquals(1, references.get(LEAKY_SIEVE_TRANSIENT));
              assertEquals(1, references.get(LEAKY_SIEVE_STABLE));
            }
      
            context.stopRoute("sieve-transient");
      
            if (isFailFast()) {
              assertEquals("Expected no service references to remain", 0, references.get(LEAKY_SIEVE_TRANSIENT));
            }
      
            if (isFailFast()) {
              // looks like we cleared more than just our route, we've stopped and cleared the global ProducerServicePool
              // since SendProcessor.stop() invokes ServiceHelper.stopServices(producerCache, producer); which in turn invokes
              // ServiceHelper.stopAndShutdownService(pool);.
              //
              // Whilst stop on the SharedProducerServicePool is a NOOP shutdown is not and effects a stop of the pool.
      
              if (isVerifyProducerServicePoolRemainsStarted()) {
               assertEquals(ServiceStatus.Started, service.getStatus());
              }
              assertEquals("Expected one stable producer to remain pooled", 1, context.getProducerServicePool().size());
              assertEquals("Expected one stable producer to remain as service", 1, references.get(LEAKY_SIEVE_STABLE));
            }
      
            // Send a body to verify behaviour of send producer after another route has been stopped
            sendBody("direct:sieve-stable", "");
      
            if (isFailFast()) {
              // shared pool is used despite being 'Stopped'
              if (isVerifyProducerServicePoolRemainsStarted()) {
                assertEquals(ServiceStatus.Started, service.getStatus());
              }
      
              assertEquals("Expected only stable producer in pool", 1, context.getProducerServicePool().size());
              assertEquals("Expected no references to transient producer", 0, references.get(LEAKY_SIEVE_TRANSIENT));
              assertEquals("Expected reference to stable producer", 1, references.get(LEAKY_SIEVE_STABLE));
            }
      
            context.startRoute("sieve-transient");
      
            // ok, back to normal
            assertEquals(ServiceStatus.Started, service.getStatus());
            if (isFailFast()) {
              assertEquals("Expected both producers in pool", 2, context.getProducerServicePool().size());
              assertEquals("Expected one transient producer as service", 1, references.get(LEAKY_SIEVE_TRANSIENT));
              assertEquals("Expected one stable producer as service", 1, references.get(LEAKY_SIEVE_STABLE));
            }
          }
      
          if (!isFailFast()) {
            assertEquals("Expected both producers in pool", 2, context.getProducerServicePool().size());
      
            // if not fixed these will equal the number of iterations in the loop + 1
            assertEquals("Expected one transient producer as service", 1, references.get(LEAKY_SIEVE_TRANSIENT));
            assertEquals("Expected one stable producer as service", 1, references.get(LEAKY_SIEVE_STABLE));
          }
        }
      
        private void registerLeakyComponent() {
          // register leaky component
          context.addComponent("leaky", new LeakySieveComponent());
        }
      }
      

      Attachments

        Activity

          People

            davsclaus Claus Ibsen
            ukcrpb6 Bob Browning
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: