Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
2.14.1, 2.14.2, 2.15.0, 2.15.1
-
None
-
Unknown
Description
Description
Producer instances that implement the ServicePoolAware interface will leak memory if their route is stopped, with new producers being leaked every time the route is started/stopped.
Known implementations that are affected are RemoteFileProducer (ftp, sftp) and Mina2Producer.
This is due to the behaviour that the SendProcessor which when the route is stopped it shuts down it's `producerCache` instance.
protected void doStop() throws Exception { ServiceHelper.stopServices(producerCache, producer); }
this in turn calls `stopAndShutdownService(pool)` which will call stop on the SharedProducerServicePool instance which is a NOOP however it also calls shutdown which effects a stop of the global pool (this stops all the registered services and then clears the pool.
protected void doStop() throws Exception { // when stopping we intend to shutdown ServiceHelper.stopAndShutdownService(pool); try { ServiceHelper.stopAndShutdownServices(producers.values()); } finally { // ensure producers are removed, and also from JMX for (Producer producer : producers.values()) { getCamelContext().removeService(producer); } } producers.clear(); }
However no call to `context.removeService(Producer) is called for the entries from the pool only those singleton instances that were in the `producers` map hence the JMX `ManagedProducer` that is created when `doGetProducer` invokes
getCamelContext().addService(answer, false);
is never removed.
Since the global pool is empty when the next request to get a producer is called a new producer is created, jmx wrapper and all, whilst the old instance remains orphaned retaining any objects that pertain to that instance.
One workaround is for the producer to call
getEndpoint().getCamelContext().removeService(this)
in it's stop method, however this is fairly obscure and it would probably be better to invoke removal of the producer when it is removed from the shared pool.
Another issue of note is that when a route is shutdown that contains a SendProcessor due to the shutdown invocation on the SharedProcessorServicePool the global pool is cleared of `everything` and remains in `Stopped` state until another route starts it (although it is still accessed and used whilst in the `Stopped` state).
Impact
For general use where there is no dynamic creation or passivation of routes this issue should be minimal, however in our use case where the routes are not static, there is a certain amount of recreation of routes as customer endpoints change and there is a need to passivate idle routes this causes a considerable memory leak (via SFTP in particular).
Test Case
package org.apache.camel.component; import com.google.common.util.concurrent.AtomicLongMap; import org.apache.camel.CamelContext; import org.apache.camel.Consumer; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.Route; import org.apache.camel.Service; import org.apache.camel.ServicePoolAware; import org.apache.camel.ServiceStatus; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.impl.DefaultComponent; import org.apache.camel.impl.DefaultEndpoint; import org.apache.camel.impl.DefaultProducer; import org.apache.camel.support.LifecycleStrategySupport; import org.apache.camel.support.ServiceSupport; import org.apache.camel.test.junit4.CamelTestSupport; import org.junit.Test; import java.util.Map; import static com.google.common.base.Preconditions.checkNotNull; /** * Test memory behaviour of producers using {@link ServicePoolAware} when using JMX. */ public class ServicePoolAwareLeakyTest extends CamelTestSupport { private static final String LEAKY_SIEVE_STABLE = "leaky://sieve-stable?plugged=true"; private static final String LEAKY_SIEVE_TRANSIENT = "leaky://sieve-transient?plugged=true"; private static boolean isPatchApplied() { return Boolean.parseBoolean(System.getProperty("patchApplied", "false")); } /** * Component that provides leaks producers. */ private static class LeakySieveComponent extends DefaultComponent { @Override protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { boolean plugged = "true".equalsIgnoreCase((String) parameters.remove("plugged")); return new LeakySieveEndpoint(uri, isPatchApplied() && plugged); } } /** * Endpoint that provides leaky producers. */ private static class LeakySieveEndpoint extends DefaultEndpoint { private final String uri; private final boolean plugged; public LeakySieveEndpoint(String uri, boolean plugged) { this.uri = checkNotNull(uri, "uri must not be null"); this.plugged = plugged; } @Override public Producer createProducer() throws Exception { return new LeakySieveProducer(this, plugged); } @Override public Consumer createConsumer(Processor processor) throws Exception { throw new UnsupportedOperationException(); } @Override public boolean isSingleton() { return true; } @Override protected String createEndpointUri() { return uri; } } /** * Leaky producer - implements {@link ServicePoolAware}. */ private static class LeakySieveProducer extends DefaultProducer implements ServicePoolAware { private final boolean plugged; public LeakySieveProducer(Endpoint endpoint, boolean plugged) { super(endpoint); this.plugged = plugged; } @Override public void process(Exchange exchange) throws Exception { // do nothing } @Override protected void doStop() throws Exception { super.doStop(); //noinspection ConstantConditions if (plugged) { // need to remove self from services since we are ServicePoolAware this will not be handled for us otherwise we // leak memory getEndpoint().getCamelContext().removeService(this); } } } @Override protected boolean useJmx() { // only occurs when using JMX as the GC root for the producer is through a ManagedProducer created by the // context.addService() invocation return true; } /** * Returns true if verification of state should be performed during the test as opposed to at the end. */ public boolean isFailFast() { return false; } /** * Returns true if during fast failure we should verify that the service pool remains in the started state. */ public boolean isVerifyProducerServicePoolRemainsStarted() { return false; } @Override public boolean isUseAdviceWith() { return true; } @Test public void testForMemoryLeak() throws Exception { registerLeakyComponent(); final AtomicLongMap<String> references = AtomicLongMap.create(); // track LeakySieveProducer lifecycle context.addLifecycleStrategy(new LifecycleStrategySupport() { @Override public void onServiceAdd(CamelContext context, Service service, Route route) { if (service instanceof LeakySieveProducer) { references.incrementAndGet(((LeakySieveProducer) service).getEndpoint().getEndpointKey()); } } @Override public void onServiceRemove(CamelContext context, Service service, Route route) { if (service instanceof LeakySieveProducer) { references.decrementAndGet(((LeakySieveProducer) service).getEndpoint().getEndpointKey()); } } }); context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { from("direct:sieve-transient") .id("sieve-transient") .to(LEAKY_SIEVE_TRANSIENT); from("direct:sieve-stable") .id("sieve-stable") .to(LEAKY_SIEVE_STABLE); } }); context.start(); for (int i = 0; i < 1000; i++) { ServiceSupport service = (ServiceSupport) context.getProducerServicePool(); assertEquals(ServiceStatus.Started, service.getStatus()); if (isFailFast()) { assertEquals(2, context.getProducerServicePool().size()); assertEquals(1, references.get(LEAKY_SIEVE_TRANSIENT)); assertEquals(1, references.get(LEAKY_SIEVE_STABLE)); } context.stopRoute("sieve-transient"); if (isFailFast()) { assertEquals("Expected no service references to remain", 0, references.get(LEAKY_SIEVE_TRANSIENT)); } if (isFailFast()) { // looks like we cleared more than just our route, we've stopped and cleared the global ProducerServicePool // since SendProcessor.stop() invokes ServiceHelper.stopServices(producerCache, producer); which in turn invokes // ServiceHelper.stopAndShutdownService(pool);. // // Whilst stop on the SharedProducerServicePool is a NOOP shutdown is not and effects a stop of the pool. if (isVerifyProducerServicePoolRemainsStarted()) { assertEquals(ServiceStatus.Started, service.getStatus()); } assertEquals("Expected one stable producer to remain pooled", 1, context.getProducerServicePool().size()); assertEquals("Expected one stable producer to remain as service", 1, references.get(LEAKY_SIEVE_STABLE)); } // Send a body to verify behaviour of send producer after another route has been stopped sendBody("direct:sieve-stable", ""); if (isFailFast()) { // shared pool is used despite being 'Stopped' if (isVerifyProducerServicePoolRemainsStarted()) { assertEquals(ServiceStatus.Started, service.getStatus()); } assertEquals("Expected only stable producer in pool", 1, context.getProducerServicePool().size()); assertEquals("Expected no references to transient producer", 0, references.get(LEAKY_SIEVE_TRANSIENT)); assertEquals("Expected reference to stable producer", 1, references.get(LEAKY_SIEVE_STABLE)); } context.startRoute("sieve-transient"); // ok, back to normal assertEquals(ServiceStatus.Started, service.getStatus()); if (isFailFast()) { assertEquals("Expected both producers in pool", 2, context.getProducerServicePool().size()); assertEquals("Expected one transient producer as service", 1, references.get(LEAKY_SIEVE_TRANSIENT)); assertEquals("Expected one stable producer as service", 1, references.get(LEAKY_SIEVE_STABLE)); } } if (!isFailFast()) { assertEquals("Expected both producers in pool", 2, context.getProducerServicePool().size()); // if not fixed these will equal the number of iterations in the loop + 1 assertEquals("Expected one transient producer as service", 1, references.get(LEAKY_SIEVE_TRANSIENT)); assertEquals("Expected one stable producer as service", 1, references.get(LEAKY_SIEVE_STABLE)); } } private void registerLeakyComponent() { // register leaky component context.addComponent("leaky", new LeakySieveComponent()); } }